2013-07-16 20:21:33 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
PySpark supports custom serializers for transferring data; this can improve
|
|
|
|
performance.
|
|
|
|
|
|
|
|
By default, PySpark uses L{PickleSerializer} to serialize objects using Python's
|
|
|
|
C{cPickle} serializer, which can serialize nearly any Python object.
|
|
|
|
Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be
|
|
|
|
faster.
|
|
|
|
|
|
|
|
The serializer is chosen when creating L{SparkContext}:
|
|
|
|
|
|
|
|
>>> from pyspark.context import SparkContext
|
|
|
|
>>> from pyspark.serializers import MarshalSerializer
|
|
|
|
>>> sc = SparkContext('local', 'test', serializer=MarshalSerializer())
|
|
|
|
>>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)
|
|
|
|
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
|
|
|
|
>>> sc.stop()
|
|
|
|
|
2014-11-04 02:56:14 -05:00
|
|
|
PySpark serialize objects in batches; By default, the batch size is chosen based
|
|
|
|
on the size of objects, also configurable by SparkContext's C{batchSize} parameter:
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
>>> sc = SparkContext('local', 'test', batchSize=2)
|
|
|
|
>>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
|
|
|
|
|
|
|
|
Behind the scenes, this creates a JavaRDD with four partitions, each of
|
|
|
|
which contains two batches of two objects:
|
|
|
|
|
|
|
|
>>> rdd.glom().collect()
|
|
|
|
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
|
|
|
|
>>> rdd._jrdd.count()
|
|
|
|
8L
|
|
|
|
>>> sc.stop()
|
|
|
|
"""
|
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
import sys
|
|
|
|
from itertools import chain, product
|
2013-11-05 20:52:39 -05:00
|
|
|
import marshal
|
|
|
|
import struct
|
2014-08-04 15:13:41 -04:00
|
|
|
import types
|
|
|
|
import collections
|
2014-08-16 19:59:34 -04:00
|
|
|
import zlib
|
2014-09-19 18:01:11 -04:00
|
|
|
import itertools
|
2014-08-04 15:13:41 -04:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
if sys.version < '3':
|
|
|
|
import cPickle as pickle
|
|
|
|
protocol = 2
|
|
|
|
from itertools import izip as zip
|
|
|
|
else:
|
|
|
|
import pickle
|
|
|
|
protocol = 3
|
|
|
|
xrange = range
|
|
|
|
|
2013-11-10 15:58:28 -05:00
|
|
|
from pyspark import cloudpickle
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
|
2014-11-04 02:56:14 -05:00
|
|
|
__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"]
|
2012-08-10 04:10:02 -04:00
|
|
|
|
|
|
|
|
2013-11-03 00:13:18 -04:00
|
|
|
class SpecialLengths(object):
|
|
|
|
END_OF_DATA_SECTION = -1
|
|
|
|
PYTHON_EXCEPTION_THROWN = -2
|
|
|
|
TIMING_DATA = -3
|
2014-10-23 20:20:00 -04:00
|
|
|
END_OF_STREAM = -4
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
NULL = -5
|
2013-11-03 00:13:18 -04:00
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class Serializer(object):
|
|
|
|
|
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
"""
|
|
|
|
Serialize an iterator of objects to the output stream.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
"""
|
|
|
|
Return an iterator of deserialized objects from the input stream.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def _load_stream_without_unbatching(self, stream):
|
|
|
|
return self.load_stream(stream)
|
|
|
|
|
|
|
|
# Note: our notion of "equality" is that output generated by
|
|
|
|
# equal serializers can be deserialized using the same serializer.
|
|
|
|
|
|
|
|
# This default implementation handles the simple cases;
|
|
|
|
# subclasses should override __eq__ as appropriate.
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
2015-04-16 19:20:57 -04:00
|
|
|
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
|
return not self.__eq__(other)
|
|
|
|
|
2014-09-16 15:51:58 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "%s()" % self.__class__.__name__
|
2014-09-16 15:51:58 -04:00
|
|
|
|
[SPARK-2377] Python API for Streaming
This patch brings Python API for Streaming.
This patch is based on work from @giwa
Author: giwa <ugw.gi.world@gmail.com>
Author: Ken Takagiwa <ken@Kens-MacBook-Pro.local>
Author: Davies Liu <davies.liu@gmail.com>
Author: Ken Takagiwa <ken@kens-mbp.gateway.sonic.net>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Ken <ugw.gi.world@gmail.com>
Author: Ken Takagiwa <ugw.gi.world@gmail.com>
Author: Matthew Farrellee <matt@redhat.com>
Closes #2538 from davies/streaming and squashes the following commits:
64561e4 [Davies Liu] fix tests
331ecce [Davies Liu] fix example
3e2492b [Davies Liu] change updateStateByKey() to easy API
182be73 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
02d0575 [Davies Liu] add wrapper for foreachRDD()
bebeb4a [Davies Liu] address all comments
6db00da [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
8380064 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
52c535b [Davies Liu] remove fix for sum()
e108ec1 [Davies Liu] address comments
37fe06f [Davies Liu] use random port for callback server
d05871e [Davies Liu] remove reuse of PythonRDD
be5e5ff [Davies Liu] merge branch of env, make tests stable.
8071541 [Davies Liu] Merge branch 'env' into streaming
c7bbbce [Davies Liu] fix sphinx docs
6bb9d91 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
54bd92b [Davies Liu] improve tests
c2b31cb [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
7a88f9f [Davies Liu] rollback RDD.setContext(), use textFileStream() to test checkpointing
bd8a4c2 [Davies Liu] fix scala style
7797c70 [Davies Liu] refactor
ff88bec [Davies Liu] rename RDDFunction to TransformFunction
d328aca [Davies Liu] fix serializer in queueStream
6f0da2f [Davies Liu] recover from checkpoint
fa7261b [Davies Liu] refactor
a13ff34 [Davies Liu] address comments
8466916 [Davies Liu] support checkpoint
9a16bd1 [Davies Liu] change number of partitions during tests
b98d63f [Davies Liu] change private[spark] to private[python]
eed6e2a [Davies Liu] rollback not needed changes
e00136b [Davies Liu] address comments
069a94c [Davies Liu] fix the number of partitions during window()
338580a [Davies Liu] change _first(), _take(), _collect() as private API
19797f9 [Davies Liu] clean up
6ebceca [Davies Liu] add more tests
c40c52d [Davies Liu] change first(), take(n) to has the same behavior as RDD
98ac6c2 [Davies Liu] support ssc.transform()
b983f0f [Davies Liu] address comments
847f9b9 [Davies Liu] add more docs, add first(), take()
e059ca2 [Davies Liu] move check of window into Python
fce0ef5 [Davies Liu] rafactor of foreachRDD()
7001b51 [Davies Liu] refactor of queueStream()
26ea396 [Davies Liu] refactor
74df565 [Davies Liu] fix print and docs
b32774c [Davies Liu] move java_import into streaming
604323f [Davies Liu] enable streaming tests
c499ba0 [Davies Liu] remove Time and Duration
3f0fb4b [Davies Liu] refactor fix tests
c28f520 [Davies Liu] support updateStateByKey
d357b70 [Davies Liu] support windowed dstream
bd13026 [Davies Liu] fix examples
eec401e [Davies Liu] refactor, combine TransformedRDD, fix reuse PythonRDD, fix union
9a57685 [Davies Liu] fix python style
bd27874 [Davies Liu] fix scala style
7339be0 [Davies Liu] delete tests
7f53086 [Davies Liu] support transform(), refactor and cleanup
df098fc [Davies Liu] Merge branch 'master' into giwa
550dfd9 [giwa] WIP fixing 1.1 merge
5cdb6fa [giwa] changed for SCCallSiteSync
e685853 [giwa] meged with rebased 1.1 branch
2d32a74 [giwa] added some StreamingContextTestSuite
4a59e1e [giwa] WIP:added more test for StreamingContext
8ffdbf1 [giwa] added atexit to handle callback server
d5f5fcb [giwa] added comment for StreamingContext.sparkContext
63c881a [giwa] added StreamingContext.sparkContext
d39f102 [giwa] added StreamingContext.remember
d542743 [giwa] clean up code
2fdf0de [Matthew Farrellee] Fix scalastyle errors
c0a06bc [giwa] delete not implemented functions
f385976 [giwa] delete inproper comments
b0f2015 [giwa] added comment in dstream._test_output
bebb3f3 [giwa] remove the last brank line
fbed8da [giwa] revert pom.xml
8ed93af [giwa] fixed explanaiton
066ba90 [giwa] revert pom.xml
fa4af88 [giwa] remove duplicated import
6ae3caa [giwa] revert pom.xml
7dc7391 [giwa] fixed typo
62dc7a3 [giwa] clean up exmples
f04882c [giwa] clen up examples
b171ec3 [giwa] fixed pep8 violation
f198d14 [giwa] clean up code
3166d31 [giwa] clean up
c00e091 [giwa] change test case not to use awaitTermination
e80647e [giwa] adopted the latest compression way of python command
58e41ff [giwa] merge with master
455e5af [giwa] removed wasted print in DStream
af336b7 [giwa] add comments
ddd4ee1 [giwa] added TODO coments
99ce042 [giwa] added saveAsTextFiles and saveAsPickledFiles
2a06cdb [giwa] remove waste duplicated code
c5ecfc1 [giwa] basic function test cases are passed
8dcda84 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
795b2cd [giwa] broke something
1e126bf [giwa] WIP: solved partitioned and None is not recognized
f67cf57 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
953deb0 [giwa] edited the comment to add more precise description
af610d3 [giwa] removed unnesessary changes
c1d546e [giwa] fixed PEP-008 violation
99410be [giwa] delete waste file
b3b0362 [giwa] added basic operation test cases
9cde7c9 [giwa] WIP added test case
bd3ba53 [giwa] WIP
5c04a5f [giwa] WIP: added PythonTestInputStream
019ef38 [giwa] WIP
1934726 [giwa] update comment
376e3ac [giwa] WIP
932372a [giwa] clean up dstream.py
0b09cff [giwa] added stop in StreamingContext
92e333e [giwa] implemented reduce and count function in Dstream
1b83354 [giwa] Removed the waste line
88f7506 [Ken Takagiwa] Kill py4j callback server properly
54b5358 [Ken Takagiwa] tried to restart callback server
4f07163 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
fe02547 [Ken Takagiwa] remove waste file
2ad7bd3 [Ken Takagiwa] clean up codes
6197a11 [Ken Takagiwa] clean up code
eb4bf48 [Ken Takagiwa] fix map function
98c2a00 [Ken Takagiwa] added count operation but this implementation need double check
58591d2 [Ken Takagiwa] reduceByKey is working
0df7111 [Ken Takagiwa] delete old file
f485b1d [Ken Takagiwa] fied input of socketTextDStream
dd6de81 [Ken Takagiwa] initial commit for socketTextStream
247fd74 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
4bcb318 [Ken Takagiwa] implementing transform function in Python
38adf95 [Ken Takagiwa] added reducedByKey not working yet
66fcfff [Ken Takagiwa] modify dstream.py to fix indent error
41886c2 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
0b99bec [Ken] initial commit for pySparkStreaming
c214199 [giwa] added testcase for combineByKey
5625bdc [giwa] added gorupByKey testcase
10ab87b [giwa] added sparkContext as input parameter in StreamingContext
10b5b04 [giwa] removed wasted print in DStream
e54f986 [giwa] add comments
16aa64f [giwa] added TODO coments
74535d4 [giwa] added saveAsTextFiles and saveAsPickledFiles
f76c182 [giwa] remove waste duplicated code
18c8723 [giwa] modified streaming test case to add coment
13fb44c [giwa] basic function test cases are passed
3000b2b [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
ff14070 [giwa] broke something
bcdec33 [giwa] WIP: solved partitioned and None is not recognized
270a9e1 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
bb10956 [giwa] edited the comment to add more precise description
253a863 [giwa] removed unnesessary changes
3d37822 [giwa] fixed PEP-008 violation
f21cab3 [giwa] delete waste file
878bad7 [giwa] added basic operation test cases
ce2acd2 [giwa] WIP added test case
9ad6855 [giwa] WIP
1df77f5 [giwa] WIP: added PythonTestInputStream
1523b66 [giwa] WIP
8a0fbbc [giwa] update comment
fe648e3 [giwa] WIP
29c2bc5 [giwa] initial commit for testcase
4d40d63 [giwa] clean up dstream.py
c462bb3 [giwa] added stop in StreamingContext
d2c01ba [giwa] clean up examples
3c45cd2 [giwa] implemented reduce and count function in Dstream
b349649 [giwa] Removed the waste line
3b498e1 [Ken Takagiwa] Kill py4j callback server properly
84a9668 [Ken Takagiwa] tried to restart callback server
9ab8952 [Tathagata Das] Added extra line.
05e991b [Tathagata Das] Added missing file
b1d2a30 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
678e854 [Ken Takagiwa] remove waste file
0a8bbbb [Ken Takagiwa] clean up codes
bab31c1 [Ken Takagiwa] clean up code
72b9738 [Ken Takagiwa] fix map function
d3ee86a [Ken Takagiwa] added count operation but this implementation need double check
15feea9 [Ken Takagiwa] edit python sparkstreaming example
6f98e50 [Ken Takagiwa] reduceByKey is working
c455c8d [Ken Takagiwa] added reducedByKey not working yet
dc6995d [Ken Takagiwa] delete old file
b31446a [Ken Takagiwa] fixed typo of network_workdcount.py
ccfd214 [Ken Takagiwa] added doctest for pyspark.streaming.duration
0d1b954 [Ken Takagiwa] fied input of socketTextDStream
f746109 [Ken Takagiwa] initial commit for socketTextStream
bb7ccf3 [Ken Takagiwa] remove unused import in python
224fc5e [Ken Takagiwa] add empty line
d2099d8 [Ken Takagiwa] sorted the import following Spark coding convention
5bac7ec [Ken Takagiwa] revert streaming/pom.xml
e1df940 [Ken Takagiwa] revert pom.xml
494cae5 [Ken Takagiwa] remove not implemented DStream functions in python
17a74c6 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
1a0f065 [Ken Takagiwa] implementing transform function in Python
d7b4d6f [Ken Takagiwa] added reducedByKey not working yet
87438e2 [Ken Takagiwa] modify dstream.py to fix indent error
b406252 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
454981d [Ken] initial commit for pySparkStreaming
150b94c [giwa] added some StreamingContextTestSuite
f7bc8f9 [giwa] WIP:added more test for StreamingContext
ee50c5a [giwa] added atexit to handle callback server
fdc9125 [giwa] added comment for StreamingContext.sparkContext
f5bfb70 [giwa] added StreamingContext.sparkContext
da09768 [giwa] added StreamingContext.remember
d68b568 [giwa] clean up code
4afa390 [giwa] clean up code
1fd6bc7 [Ken Takagiwa] Merge pull request #2 from mattf/giwa-master
d9d59fe [Matthew Farrellee] Fix scalastyle errors
67473a9 [giwa] delete not implemented functions
c97377c [giwa] delete inproper comments
2ea769e [giwa] added comment in dstream._test_output
3b27bd4 [giwa] remove the last brank line
acfcaeb [giwa] revert pom.xml
93f7637 [giwa] fixed explanaiton
50fd6f9 [giwa] revert pom.xml
4f82c89 [giwa] remove duplicated import
9d1de23 [giwa] revert pom.xml
7339df2 [giwa] fixed typo
9c85e48 [giwa] clean up exmples
24f95db [giwa] clen up examples
0d30109 [giwa] fixed pep8 violation
b7dab85 [giwa] improve test case
583e66d [giwa] move tests for streaming inside streaming directory
1d84142 [giwa] remove unimplement test
f0ea311 [giwa] clean up code
171edeb [giwa] clean up
4dedd2d [giwa] change test case not to use awaitTermination
268a6a5 [giwa] Changed awaitTermination not to call awaitTermincation in Scala. Just use time.sleep instread
09a28bf [giwa] improve testcases
58150f5 [giwa] Changed the test case to focus the test operation
199e37f [giwa] adopted the latest compression way of python command
185fdbf [giwa] merge with master
f1798c4 [giwa] merge with master
e70f706 [giwa] added testcase for combineByKey
e162822 [giwa] added gorupByKey testcase
97742fe [giwa] added sparkContext as input parameter in StreamingContext
14d4c0e [giwa] removed wasted print in DStream
6d8190a [giwa] add comments
4aa99e4 [giwa] added TODO coments
e9fab72 [giwa] added saveAsTextFiles and saveAsPickledFiles
94f2b65 [giwa] remove waste duplicated code
580fbc2 [giwa] modified streaming test case to add coment
99e4bb3 [giwa] basic function test cases are passed
7051a84 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
35933e1 [giwa] broke something
9767712 [giwa] WIP: solved partitioned and None is not recognized
4f2d7e6 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
33c0f94d [giwa] edited the comment to add more precise description
774f18d [giwa] removed unnesessary changes
3a671cc [giwa] remove export PYSPARK_PYTHON in spark submit
8efa266 [giwa] fixed PEP-008 violation
fa75d71 [giwa] delete waste file
7f96294 [giwa] added basic operation test cases
3dda31a [giwa] WIP added test case
1f68b78 [giwa] WIP
c05922c [giwa] WIP: added PythonTestInputStream
1fd12ae [giwa] WIP
c880a33 [giwa] update comment
5d22c92 [giwa] WIP
ea4b06b [giwa] initial commit for testcase
5a9b525 [giwa] clean up dstream.py
79c5809 [giwa] added stop in StreamingContext
189dcea [giwa] clean up examples
b8d7d24 [giwa] implemented reduce and count function in Dstream
b6468e6 [giwa] Removed the waste line
b47b5fd [Ken Takagiwa] Kill py4j callback server properly
19ddcdd [Ken Takagiwa] tried to restart callback server
c9fc124 [Tathagata Das] Added extra line.
4caae3f [Tathagata Das] Added missing file
4eff053 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
5e822d4 [Ken Takagiwa] remove waste file
aeaf8a5 [Ken Takagiwa] clean up codes
9fa249b [Ken Takagiwa] clean up code
05459c6 [Ken Takagiwa] fix map function
a9f4ecb [Ken Takagiwa] added count operation but this implementation need double check
d1ee6ca [Ken Takagiwa] edit python sparkstreaming example
0b8b7d0 [Ken Takagiwa] reduceByKey is working
d25d5cf [Ken Takagiwa] added reducedByKey not working yet
7f7c5d1 [Ken Takagiwa] delete old file
967dc26 [Ken Takagiwa] fixed typo of network_workdcount.py
57fb740 [Ken Takagiwa] added doctest for pyspark.streaming.duration
4b69fb1 [Ken Takagiwa] fied input of socketTextDStream
02f618a [Ken Takagiwa] initial commit for socketTextStream
4ce4058 [Ken Takagiwa] remove unused import in python
856d98e [Ken Takagiwa] add empty line
490e338 [Ken Takagiwa] sorted the import following Spark coding convention
5594bd4 [Ken Takagiwa] revert pom.xml
2adca84 [Ken Takagiwa] remove not implemented DStream functions in python
e551e13 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
3758175 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
c5518b4 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
dcf243f [Ken Takagiwa] implementing transform function in Python
9af03f4 [Ken Takagiwa] added reducedByKey not working yet
6e0d9c7 [Ken Takagiwa] modify dstream.py to fix indent error
e497b9b [Ken Takagiwa] comment PythonDStream.PairwiseDStream
5c3a683 [Ken] initial commit for pySparkStreaming
665bfdb [giwa] added testcase for combineByKey
a3d2379 [giwa] added gorupByKey testcase
636090a [giwa] added sparkContext as input parameter in StreamingContext
e7ebb08 [giwa] removed wasted print in DStream
d8b593b [giwa] add comments
ea9c873 [giwa] added TODO coments
89ae38a [giwa] added saveAsTextFiles and saveAsPickledFiles
e3033fc [giwa] remove waste duplicated code
a14c7e1 [giwa] modified streaming test case to add coment
536def4 [giwa] basic function test cases are passed
2112638 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
080541a [giwa] broke something
0704b86 [giwa] WIP: solved partitioned and None is not recognized
90a6484 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
a65f302 [giwa] edited the comment to add more precise description
bdde697 [giwa] removed unnesessary changes
e8c7bfc [giwa] remove export PYSPARK_PYTHON in spark submit
3334169 [giwa] fixed PEP-008 violation
db0a303 [giwa] delete waste file
2cfd3a0 [giwa] added basic operation test cases
90ae568 [giwa] WIP added test case
a120d07 [giwa] WIP
f671cdb [giwa] WIP: added PythonTestInputStream
56fae45 [giwa] WIP
e35e101 [giwa] Merge branch 'master' into testcase
ba5112d [giwa] update comment
28aa56d [giwa] WIP
fb08559 [giwa] initial commit for testcase
a613b85 [giwa] clean up dstream.py
c40c0ef [giwa] added stop in StreamingContext
31e4260 [giwa] clean up examples
d2127d6 [giwa] implemented reduce and count function in Dstream
48f7746 [giwa] Removed the waste line
0f83eaa [Ken Takagiwa] delete py4j 0.8.1
1679808 [Ken Takagiwa] Kill py4j callback server properly
f96cd4e [Ken Takagiwa] tried to restart callback server
fe86198 [Ken Takagiwa] add py4j 0.8.2.1 but server is not launched
1064fe0 [Ken Takagiwa] Merge branch 'master' of https://github.com/giwa/spark
28c6620 [Ken Takagiwa] Implemented DStream.foreachRDD in the Python API using Py4J callback server
85b0fe1 [Ken Takagiwa] Merge pull request #1 from tdas/python-foreach
54e2e8c [Tathagata Das] Added extra line.
e185338 [Tathagata Das] Added missing file
a778d4b [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
cc2092b [Ken Takagiwa] remove waste file
d042ac6 [Ken Takagiwa] clean up codes
84a021f [Ken Takagiwa] clean up code
bd20e17 [Ken Takagiwa] fix map function
d01a125 [Ken Takagiwa] added count operation but this implementation need double check
7d05109 [Ken Takagiwa] merge with remote branch
ae464e0 [Ken Takagiwa] edit python sparkstreaming example
04af046 [Ken Takagiwa] reduceByKey is working
3b6d7b0 [Ken Takagiwa] implementing transform function in Python
571d52d [Ken Takagiwa] added reducedByKey not working yet
5720979 [Ken Takagiwa] delete old file
e604fcb [Ken Takagiwa] fixed typo of network_workdcount.py
4b7c08b [Ken Takagiwa] Merge branch 'master' of https://github.com/giwa/spark
ce7d426 [Ken Takagiwa] added doctest for pyspark.streaming.duration
a8c9fd5 [Ken Takagiwa] fixed for socketTextStream
a61fa9e [Ken Takagiwa] fied input of socketTextDStream
1e84f41 [Ken Takagiwa] initial commit for socketTextStream
6d012f7 [Ken Takagiwa] remove unused import in python
25d30d5 [Ken Takagiwa] add empty line
6e0a64a [Ken Takagiwa] sorted the import following Spark coding convention
fa4a7fc [Ken Takagiwa] revert streaming/pom.xml
8f8202b [Ken Takagiwa] revert streaming pom.xml
c9d79dd [Ken Takagiwa] revert pom.xml
57e3e52 [Ken Takagiwa] remove not implemented DStream functions in python
0a516f5 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
a7a0b5c [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
72bfc66 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
69e9cd3 [Ken Takagiwa] implementing transform function in Python
94a0787 [Ken Takagiwa] added reducedByKey not working yet
88068cf [Ken Takagiwa] modify dstream.py to fix indent error
1367be5 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
eb2b3ba [Ken] Merge remote-tracking branch 'upstream/master'
d8e51f9 [Ken] initial commit for pySparkStreaming
2014-10-12 05:46:56 -04:00
|
|
|
def __hash__(self):
|
|
|
|
return hash(str(self))
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
class FramedSerializer(Serializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serializer that writes objects as a stream of (length, data) pairs,
|
|
|
|
where C{length} is a 32-bit integer and data is C{length} bytes.
|
|
|
|
"""
|
|
|
|
|
2014-04-05 23:52:05 -04:00
|
|
|
def __init__(self):
|
|
|
|
# On Python 2.6, we can't write bytearrays to streams, so we need to convert them
|
|
|
|
# to strings first. Check if the version number is that old.
|
|
|
|
self._only_write_strings = sys.version_info[0:2] <= (2, 6)
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
for obj in iterator:
|
|
|
|
self._write_with_length(obj, stream)
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
yield self._read_with_length(stream)
|
|
|
|
except EOFError:
|
|
|
|
return
|
|
|
|
|
|
|
|
def _write_with_length(self, obj, stream):
|
2013-11-10 20:48:27 -05:00
|
|
|
serialized = self.dumps(obj)
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
if serialized is None:
|
|
|
|
raise ValueError("serialized value should not be None")
|
2014-11-18 19:17:51 -05:00
|
|
|
if len(serialized) > (1 << 31):
|
|
|
|
raise ValueError("can not serialize object larger than 2G")
|
2013-11-05 20:52:39 -05:00
|
|
|
write_int(len(serialized), stream)
|
2014-04-05 23:52:05 -04:00
|
|
|
if self._only_write_strings:
|
|
|
|
stream.write(str(serialized))
|
|
|
|
else:
|
|
|
|
stream.write(serialized)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
def _read_with_length(self, stream):
|
|
|
|
length = read_int(stream)
|
2014-09-13 19:22:04 -04:00
|
|
|
if length == SpecialLengths.END_OF_DATA_SECTION:
|
|
|
|
raise EOFError
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
elif length == SpecialLengths.NULL:
|
|
|
|
return None
|
2013-11-05 20:52:39 -05:00
|
|
|
obj = stream.read(length)
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
if len(obj) < length:
|
2013-11-05 20:52:39 -05:00
|
|
|
raise EOFError
|
2013-11-10 20:48:27 -05:00
|
|
|
return self.loads(obj)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2013-11-10 20:48:27 -05:00
|
|
|
def dumps(self, obj):
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serialize an object into a byte array.
|
|
|
|
When batching is used, this will be called with an array of objects.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
2013-11-10 20:48:27 -05:00
|
|
|
def loads(self, obj):
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Deserialize an object from a byte array.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
|
|
class BatchedSerializer(Serializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serializes a stream of objects in batches by calling its wrapped
|
|
|
|
Serializer with streams of objects.
|
|
|
|
"""
|
|
|
|
|
|
|
|
UNLIMITED_BATCH_SIZE = -1
|
2014-11-04 02:56:14 -05:00
|
|
|
UNKNOWN_BATCH_SIZE = 0
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
|
|
|
|
self.serializer = serializer
|
|
|
|
self.batchSize = batchSize
|
|
|
|
|
|
|
|
def _batched(self, iterator):
|
|
|
|
if self.batchSize == self.UNLIMITED_BATCH_SIZE:
|
|
|
|
yield list(iterator)
|
2015-01-15 14:40:41 -05:00
|
|
|
elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
|
|
|
|
n = len(iterator)
|
|
|
|
for i in xrange(0, n, self.batchSize):
|
|
|
|
yield iterator[i: i + self.batchSize]
|
2013-11-05 20:52:39 -05:00
|
|
|
else:
|
|
|
|
items = []
|
|
|
|
count = 0
|
|
|
|
for item in iterator:
|
|
|
|
items.append(item)
|
|
|
|
count += 1
|
|
|
|
if count == self.batchSize:
|
|
|
|
yield items
|
|
|
|
items = []
|
|
|
|
count = 0
|
|
|
|
if items:
|
|
|
|
yield items
|
|
|
|
|
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
self.serializer.dump_stream(self._batched(iterator), stream)
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
return chain.from_iterable(self._load_stream_without_unbatching(stream))
|
|
|
|
|
|
|
|
def _load_stream_without_unbatching(self, stream):
|
2014-07-25 01:53:47 -04:00
|
|
|
return self.serializer.load_stream(stream)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2014-10-06 17:05:45 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
|
[SPARK-3074] [PySpark] support groupByKey() with single huge key
This patch change groupByKey() to use external sort based approach, so it can support single huge key.
For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).
During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes #1977 from davies/groupby and squashes the following commits:
af3713a [Davies Liu] make sure it's iterator
67772dd [Davies Liu] fix tests
e78c15c [Davies Liu] address comments
0b0fde8 [Davies Liu] address comments
0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable
e3b8eab [Davies Liu] fix narrow dependency
2a1857a [Davies Liu] typo
d2f053b [Davies Liu] add repr for FlattedValuesSerializer
c6a2f8d [Davies Liu] address comments
9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
2b9c261 [Davies Liu] fix typo in comments
70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
ab5515b [Davies Liu] Merge branch 'master' into groupby
651f891 [Davies Liu] simplify GroupByKey
1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
1f69f93 [Davies Liu] fix tests
0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
341f1e0 [Davies Liu] add comments, refactor
47918b8 [Davies Liu] remove unused code
6540948 [Davies Liu] address comments:
17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
4d4bc86 [Davies Liu] bugfix
8ef965e [Davies Liu] Merge branch 'master' into groupby
fbc504a [Davies Liu] Merge branch 'master' into groupby
779ed03 [Davies Liu] fix merge conflict
2c1d05b [Davies Liu] refactor, minor turning
b48cda5 [Davies Liu] Merge branch 'master' into groupby
85138e6 [Davies Liu] Merge branch 'master' into groupby
acd8e1b [Davies Liu] fix memory when groupByKey().count()
905b233 [Davies Liu] Merge branch 'sort' into groupby
1f075ed [Davies Liu] Merge branch 'master' into sort
4b07d39 [Davies Liu] compress the data while spilling
0a081c6 [Davies Liu] Merge branch 'master' into groupby
f157fe7 [Davies Liu] Merge branch 'sort' into groupby
eb53ca6 [Davies Liu] Merge branch 'master' into sort
b2dc3bf [Davies Liu] Merge branch 'sort' into groupby
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
11ba318 [Davies Liu] typo
085aef8 [Davies Liu] Merge branch 'master' into groupby
3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data
1ea0669 [Davies Liu] choose sort based groupByKey() automatically
b40bae7 [Davies Liu] bugfix
efa23df [Davies Liu] refactor, add spark.shuffle.sort=False
250be4e [Davies Liu] flatten the combined values when dumping into disks
d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting
083d842 [Davies Liu] sorted based groupByKey()
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
2015-04-09 20:07:23 -04:00
|
|
|
class FlattenedValuesSerializer(BatchedSerializer):
|
|
|
|
|
|
|
|
"""
|
|
|
|
Serializes a stream of list of pairs, split the list of values
|
|
|
|
which contain more than a certain number of objects to make them
|
|
|
|
have similar sizes.
|
|
|
|
"""
|
|
|
|
def __init__(self, serializer, batchSize=10):
|
|
|
|
BatchedSerializer.__init__(self, serializer, batchSize)
|
|
|
|
|
|
|
|
def _batched(self, iterator):
|
|
|
|
n = self.batchSize
|
|
|
|
for key, values in iterator:
|
2015-04-16 19:20:57 -04:00
|
|
|
for i in range(0, len(values), n):
|
[SPARK-3074] [PySpark] support groupByKey() with single huge key
This patch change groupByKey() to use external sort based approach, so it can support single huge key.
For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).
During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes #1977 from davies/groupby and squashes the following commits:
af3713a [Davies Liu] make sure it's iterator
67772dd [Davies Liu] fix tests
e78c15c [Davies Liu] address comments
0b0fde8 [Davies Liu] address comments
0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable
e3b8eab [Davies Liu] fix narrow dependency
2a1857a [Davies Liu] typo
d2f053b [Davies Liu] add repr for FlattedValuesSerializer
c6a2f8d [Davies Liu] address comments
9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
2b9c261 [Davies Liu] fix typo in comments
70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
ab5515b [Davies Liu] Merge branch 'master' into groupby
651f891 [Davies Liu] simplify GroupByKey
1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
1f69f93 [Davies Liu] fix tests
0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
341f1e0 [Davies Liu] add comments, refactor
47918b8 [Davies Liu] remove unused code
6540948 [Davies Liu] address comments:
17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
4d4bc86 [Davies Liu] bugfix
8ef965e [Davies Liu] Merge branch 'master' into groupby
fbc504a [Davies Liu] Merge branch 'master' into groupby
779ed03 [Davies Liu] fix merge conflict
2c1d05b [Davies Liu] refactor, minor turning
b48cda5 [Davies Liu] Merge branch 'master' into groupby
85138e6 [Davies Liu] Merge branch 'master' into groupby
acd8e1b [Davies Liu] fix memory when groupByKey().count()
905b233 [Davies Liu] Merge branch 'sort' into groupby
1f075ed [Davies Liu] Merge branch 'master' into sort
4b07d39 [Davies Liu] compress the data while spilling
0a081c6 [Davies Liu] Merge branch 'master' into groupby
f157fe7 [Davies Liu] Merge branch 'sort' into groupby
eb53ca6 [Davies Liu] Merge branch 'master' into sort
b2dc3bf [Davies Liu] Merge branch 'sort' into groupby
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
11ba318 [Davies Liu] typo
085aef8 [Davies Liu] Merge branch 'master' into groupby
3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data
1ea0669 [Davies Liu] choose sort based groupByKey() automatically
b40bae7 [Davies Liu] bugfix
efa23df [Davies Liu] refactor, add spark.shuffle.sort=False
250be4e [Davies Liu] flatten the combined values when dumping into disks
d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting
083d842 [Davies Liu] sorted based groupByKey()
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
2015-04-09 20:07:23 -04:00
|
|
|
yield key, values[i:i + n]
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
return self.serializer.load_stream(stream)
|
|
|
|
|
|
|
|
def __repr__(self):
|
2015-04-16 19:20:57 -04:00
|
|
|
return "FlattenedValuesSerializer(%s, %d)" % (self.serializer, self.batchSize)
|
[SPARK-3074] [PySpark] support groupByKey() with single huge key
This patch change groupByKey() to use external sort based approach, so it can support single huge key.
For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).
During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes #1977 from davies/groupby and squashes the following commits:
af3713a [Davies Liu] make sure it's iterator
67772dd [Davies Liu] fix tests
e78c15c [Davies Liu] address comments
0b0fde8 [Davies Liu] address comments
0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable
e3b8eab [Davies Liu] fix narrow dependency
2a1857a [Davies Liu] typo
d2f053b [Davies Liu] add repr for FlattedValuesSerializer
c6a2f8d [Davies Liu] address comments
9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
2b9c261 [Davies Liu] fix typo in comments
70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
ab5515b [Davies Liu] Merge branch 'master' into groupby
651f891 [Davies Liu] simplify GroupByKey
1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
1f69f93 [Davies Liu] fix tests
0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
341f1e0 [Davies Liu] add comments, refactor
47918b8 [Davies Liu] remove unused code
6540948 [Davies Liu] address comments:
17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
4d4bc86 [Davies Liu] bugfix
8ef965e [Davies Liu] Merge branch 'master' into groupby
fbc504a [Davies Liu] Merge branch 'master' into groupby
779ed03 [Davies Liu] fix merge conflict
2c1d05b [Davies Liu] refactor, minor turning
b48cda5 [Davies Liu] Merge branch 'master' into groupby
85138e6 [Davies Liu] Merge branch 'master' into groupby
acd8e1b [Davies Liu] fix memory when groupByKey().count()
905b233 [Davies Liu] Merge branch 'sort' into groupby
1f075ed [Davies Liu] Merge branch 'master' into sort
4b07d39 [Davies Liu] compress the data while spilling
0a081c6 [Davies Liu] Merge branch 'master' into groupby
f157fe7 [Davies Liu] Merge branch 'sort' into groupby
eb53ca6 [Davies Liu] Merge branch 'master' into sort
b2dc3bf [Davies Liu] Merge branch 'sort' into groupby
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
11ba318 [Davies Liu] typo
085aef8 [Davies Liu] Merge branch 'master' into groupby
3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data
1ea0669 [Davies Liu] choose sort based groupByKey() automatically
b40bae7 [Davies Liu] bugfix
efa23df [Davies Liu] refactor, add spark.shuffle.sort=False
250be4e [Davies Liu] flatten the combined values when dumping into disks
d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting
083d842 [Davies Liu] sorted based groupByKey()
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
2015-04-09 20:07:23 -04:00
|
|
|
|
|
|
|
|
2014-09-19 18:01:11 -04:00
|
|
|
class AutoBatchedSerializer(BatchedSerializer):
|
|
|
|
"""
|
|
|
|
Choose the size of batch automatically based on the size of object
|
|
|
|
"""
|
|
|
|
|
2014-10-10 17:14:05 -04:00
|
|
|
def __init__(self, serializer, bestSize=1 << 16):
|
2014-11-04 02:56:14 -05:00
|
|
|
BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE)
|
2014-09-19 18:01:11 -04:00
|
|
|
self.bestSize = bestSize
|
|
|
|
|
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
batch, best = 1, self.bestSize
|
|
|
|
iterator = iter(iterator)
|
|
|
|
while True:
|
|
|
|
vs = list(itertools.islice(iterator, batch))
|
|
|
|
if not vs:
|
|
|
|
break
|
|
|
|
|
|
|
|
bytes = self.serializer.dumps(vs)
|
|
|
|
write_int(len(bytes), stream)
|
|
|
|
stream.write(bytes)
|
|
|
|
|
|
|
|
size = len(bytes)
|
|
|
|
if size < best:
|
|
|
|
batch *= 2
|
|
|
|
elif size > best * 10 and batch > 1:
|
|
|
|
batch /= 2
|
|
|
|
|
[SPARK-3074] [PySpark] support groupByKey() with single huge key
This patch change groupByKey() to use external sort based approach, so it can support single huge key.
For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).
During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes #1977 from davies/groupby and squashes the following commits:
af3713a [Davies Liu] make sure it's iterator
67772dd [Davies Liu] fix tests
e78c15c [Davies Liu] address comments
0b0fde8 [Davies Liu] address comments
0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable
e3b8eab [Davies Liu] fix narrow dependency
2a1857a [Davies Liu] typo
d2f053b [Davies Liu] add repr for FlattedValuesSerializer
c6a2f8d [Davies Liu] address comments
9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
2b9c261 [Davies Liu] fix typo in comments
70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
ab5515b [Davies Liu] Merge branch 'master' into groupby
651f891 [Davies Liu] simplify GroupByKey
1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
1f69f93 [Davies Liu] fix tests
0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
341f1e0 [Davies Liu] add comments, refactor
47918b8 [Davies Liu] remove unused code
6540948 [Davies Liu] address comments:
17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
4d4bc86 [Davies Liu] bugfix
8ef965e [Davies Liu] Merge branch 'master' into groupby
fbc504a [Davies Liu] Merge branch 'master' into groupby
779ed03 [Davies Liu] fix merge conflict
2c1d05b [Davies Liu] refactor, minor turning
b48cda5 [Davies Liu] Merge branch 'master' into groupby
85138e6 [Davies Liu] Merge branch 'master' into groupby
acd8e1b [Davies Liu] fix memory when groupByKey().count()
905b233 [Davies Liu] Merge branch 'sort' into groupby
1f075ed [Davies Liu] Merge branch 'master' into sort
4b07d39 [Davies Liu] compress the data while spilling
0a081c6 [Davies Liu] Merge branch 'master' into groupby
f157fe7 [Davies Liu] Merge branch 'sort' into groupby
eb53ca6 [Davies Liu] Merge branch 'master' into sort
b2dc3bf [Davies Liu] Merge branch 'sort' into groupby
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
11ba318 [Davies Liu] typo
085aef8 [Davies Liu] Merge branch 'master' into groupby
3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data
1ea0669 [Davies Liu] choose sort based groupByKey() automatically
b40bae7 [Davies Liu] bugfix
efa23df [Davies Liu] refactor, add spark.shuffle.sort=False
250be4e [Davies Liu] flatten the combined values when dumping into disks
d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting
083d842 [Davies Liu] sorted based groupByKey()
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
2015-04-09 20:07:23 -04:00
|
|
|
def __repr__(self):
|
2015-04-16 19:20:57 -04:00
|
|
|
return "AutoBatchedSerializer(%s)" % self.serializer
|
2014-09-19 18:01:11 -04:00
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class CartesianDeserializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
"""
|
2013-11-05 20:52:39 -05:00
|
|
|
Deserializes the JavaRDD cartesian() of two PythonRDDs.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, key_ser, val_ser):
|
2015-04-16 19:20:57 -04:00
|
|
|
FramedSerializer.__init__(self)
|
2013-11-05 20:52:39 -05:00
|
|
|
self.key_ser = key_ser
|
|
|
|
self.val_ser = val_ser
|
|
|
|
|
2014-03-10 16:27:00 -04:00
|
|
|
def prepare_keys_values(self, stream):
|
2013-11-05 20:52:39 -05:00
|
|
|
key_stream = self.key_ser._load_stream_without_unbatching(stream)
|
|
|
|
val_stream = self.val_ser._load_stream_without_unbatching(stream)
|
|
|
|
key_is_batched = isinstance(self.key_ser, BatchedSerializer)
|
|
|
|
val_is_batched = isinstance(self.val_ser, BatchedSerializer)
|
2015-04-16 19:20:57 -04:00
|
|
|
for (keys, vals) in zip(key_stream, val_stream):
|
2013-11-05 20:52:39 -05:00
|
|
|
keys = keys if key_is_batched else [keys]
|
|
|
|
vals = vals if val_is_batched else [vals]
|
2014-03-10 16:27:00 -04:00
|
|
|
yield (keys, vals)
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
for (keys, vals) in self.prepare_keys_values(stream):
|
2013-11-05 20:52:39 -05:00
|
|
|
for pair in product(keys, vals):
|
|
|
|
yield pair
|
|
|
|
|
2014-10-06 17:05:45 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "CartesianDeserializer(%s, %s)" % \
|
2013-11-05 20:52:39 -05:00
|
|
|
(str(self.key_ser), str(self.val_ser))
|
|
|
|
|
|
|
|
|
2014-03-10 16:27:00 -04:00
|
|
|
class PairDeserializer(CartesianDeserializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-03-10 16:27:00 -04:00
|
|
|
"""
|
|
|
|
Deserializes the JavaRDD zip() of two PythonRDDs.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
for (keys, vals) in self.prepare_keys_values(stream):
|
2014-08-19 17:46:32 -04:00
|
|
|
if len(keys) != len(vals):
|
|
|
|
raise ValueError("Can not deserialize RDD with different number of items"
|
|
|
|
" in pair: (%d, %d)" % (len(keys), len(vals)))
|
2015-04-16 19:20:57 -04:00
|
|
|
for pair in zip(keys, vals):
|
2014-03-10 16:27:00 -04:00
|
|
|
yield pair
|
|
|
|
|
2014-10-06 17:05:45 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
|
2014-03-10 16:27:00 -04:00
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class NoOpSerializer(FramedSerializer):
|
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
def loads(self, obj):
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def dumps(self, obj):
|
|
|
|
return obj
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
|
2014-08-04 15:13:41 -04:00
|
|
|
# Hook namedtuple, make it picklable
|
|
|
|
|
|
|
|
__cls = {}
|
|
|
|
|
|
|
|
|
|
|
|
def _restore(name, fields, value):
|
|
|
|
""" Restore an object of namedtuple"""
|
|
|
|
k = (name, fields)
|
|
|
|
cls = __cls.get(k)
|
|
|
|
if cls is None:
|
|
|
|
cls = collections.namedtuple(name, fields)
|
|
|
|
__cls[k] = cls
|
|
|
|
return cls(*value)
|
|
|
|
|
|
|
|
|
|
|
|
def _hack_namedtuple(cls):
|
|
|
|
""" Make class generated by namedtuple picklable """
|
|
|
|
name = cls.__name__
|
|
|
|
fields = cls._fields
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-08-04 15:13:41 -04:00
|
|
|
def __reduce__(self):
|
|
|
|
return (_restore, (name, fields, tuple(self)))
|
|
|
|
cls.__reduce__ = __reduce__
|
|
|
|
return cls
|
|
|
|
|
|
|
|
|
|
|
|
def _hijack_namedtuple():
|
|
|
|
""" Hack namedtuple() to make it picklable """
|
2014-08-04 18:54:52 -04:00
|
|
|
# hijack only one time
|
|
|
|
if hasattr(collections.namedtuple, "__hijack"):
|
|
|
|
return
|
2014-08-04 15:13:41 -04:00
|
|
|
|
2014-08-06 15:58:24 -04:00
|
|
|
global _old_namedtuple # or it will put in closure
|
|
|
|
|
2014-08-04 15:13:41 -04:00
|
|
|
def _copy_func(f):
|
2015-04-16 19:20:57 -04:00
|
|
|
return types.FunctionType(f.__code__, f.__globals__, f.__name__,
|
|
|
|
f.__defaults__, f.__closure__)
|
2014-08-04 15:13:41 -04:00
|
|
|
|
|
|
|
_old_namedtuple = _copy_func(collections.namedtuple)
|
|
|
|
|
2014-08-11 14:54:09 -04:00
|
|
|
def namedtuple(*args, **kwargs):
|
|
|
|
cls = _old_namedtuple(*args, **kwargs)
|
2014-08-04 15:13:41 -04:00
|
|
|
return _hack_namedtuple(cls)
|
|
|
|
|
|
|
|
# replace namedtuple with new one
|
2015-04-16 19:20:57 -04:00
|
|
|
collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
|
|
|
|
collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
|
|
|
|
collections.namedtuple.__code__ = namedtuple.__code__
|
2014-08-04 18:54:52 -04:00
|
|
|
collections.namedtuple.__hijack = 1
|
2014-08-04 15:13:41 -04:00
|
|
|
|
|
|
|
# hack the cls already generated by namedtuple
|
|
|
|
# those created in other module can be pickled as normal,
|
|
|
|
# so only hack those in __main__ module
|
2015-04-16 19:20:57 -04:00
|
|
|
for n, o in sys.modules["__main__"].__dict__.items():
|
2014-08-04 15:13:41 -04:00
|
|
|
if (type(o) is type and o.__base__ is tuple
|
2014-08-06 15:58:24 -04:00
|
|
|
and hasattr(o, "_fields")
|
|
|
|
and "__reduce__" not in o.__dict__):
|
|
|
|
_hack_namedtuple(o) # hack inplace
|
2014-08-04 15:13:41 -04:00
|
|
|
|
|
|
|
|
|
|
|
_hijack_namedtuple()
|
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class PickleSerializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
Serializes objects using Python's pickle serializer:
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
http://docs.python.org/2/library/pickle.html
|
|
|
|
|
|
|
|
This serializer supports nearly any Python object, but may
|
|
|
|
not be as fast as more specialized serializers.
|
|
|
|
"""
|
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
def dumps(self, obj):
|
2015-04-16 19:20:57 -04:00
|
|
|
return pickle.dumps(obj, protocol)
|
2014-07-22 01:30:53 -04:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
if sys.version >= '3':
|
|
|
|
def loads(self, obj, encoding="bytes"):
|
|
|
|
return pickle.loads(obj, encoding=encoding)
|
|
|
|
else:
|
|
|
|
def loads(self, obj, encoding=None):
|
|
|
|
return pickle.loads(obj)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
|
2013-11-10 15:58:28 -05:00
|
|
|
class CloudPickleSerializer(PickleSerializer):
|
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
def dumps(self, obj):
|
|
|
|
return cloudpickle.dumps(obj, 2)
|
2013-11-10 15:58:28 -05:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class MarshalSerializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
"""
|
2013-11-05 20:52:39 -05:00
|
|
|
Serializes objects using Python's Marshal serializer:
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
http://docs.python.org/2/library/marshal.html
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
This serializer is faster than PickleSerializer but supports fewer datatypes.
|
|
|
|
"""
|
|
|
|
|
2014-09-12 21:42:50 -04:00
|
|
|
def dumps(self, obj):
|
|
|
|
return marshal.dumps(obj)
|
|
|
|
|
|
|
|
def loads(self, obj):
|
|
|
|
return marshal.loads(obj)
|
2014-07-25 01:53:47 -04:00
|
|
|
|
|
|
|
|
|
|
|
class AutoSerializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-07-25 01:53:47 -04:00
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
Choose marshal or pickle as serialization protocol automatically
|
2014-07-25 01:53:47 -04:00
|
|
|
"""
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-07-25 01:53:47 -04:00
|
|
|
def __init__(self):
|
|
|
|
FramedSerializer.__init__(self)
|
|
|
|
self._type = None
|
|
|
|
|
|
|
|
def dumps(self, obj):
|
|
|
|
if self._type is not None:
|
2015-04-16 19:20:57 -04:00
|
|
|
return b'P' + pickle.dumps(obj, -1)
|
2014-07-25 01:53:47 -04:00
|
|
|
try:
|
2015-04-16 19:20:57 -04:00
|
|
|
return b'M' + marshal.dumps(obj)
|
2014-07-25 01:53:47 -04:00
|
|
|
except Exception:
|
2015-04-16 19:20:57 -04:00
|
|
|
self._type = b'P'
|
|
|
|
return b'P' + pickle.dumps(obj, -1)
|
2014-07-25 01:53:47 -04:00
|
|
|
|
|
|
|
def loads(self, obj):
|
|
|
|
_type = obj[0]
|
2015-04-16 19:20:57 -04:00
|
|
|
if _type == b'M':
|
2014-07-25 01:53:47 -04:00
|
|
|
return marshal.loads(obj[1:])
|
2015-04-16 19:20:57 -04:00
|
|
|
elif _type == b'P':
|
|
|
|
return pickle.loads(obj[1:])
|
2014-07-25 01:53:47 -04:00
|
|
|
else:
|
|
|
|
raise ValueError("invalid sevialization type: %s" % _type)
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2014-08-16 19:59:34 -04:00
|
|
|
|
2014-11-24 20:17:03 -05:00
|
|
|
class CompressedSerializer(FramedSerializer):
|
2014-11-18 19:17:51 -05:00
|
|
|
"""
|
|
|
|
Compress the serialized data
|
|
|
|
"""
|
2014-08-16 19:59:34 -04:00
|
|
|
def __init__(self, serializer):
|
2014-11-24 20:17:03 -05:00
|
|
|
FramedSerializer.__init__(self)
|
|
|
|
assert isinstance(serializer, FramedSerializer), "serializer must be a FramedSerializer"
|
2014-08-16 19:59:34 -04:00
|
|
|
self.serializer = serializer
|
|
|
|
|
2014-11-24 20:17:03 -05:00
|
|
|
def dumps(self, obj):
|
|
|
|
return zlib.compress(self.serializer.dumps(obj), 1)
|
2014-08-16 19:59:34 -04:00
|
|
|
|
2014-11-24 20:17:03 -05:00
|
|
|
def loads(self, obj):
|
|
|
|
return self.serializer.loads(zlib.decompress(obj))
|
2014-08-16 19:59:34 -04:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return "CompressedSerializer(%s)" % self.serializer
|
2014-12-16 01:58:26 -05:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2014-01-28 22:50:26 -05:00
|
|
|
class UTF8Deserializer(Serializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
2014-04-04 20:29:29 -04:00
|
|
|
Deserializes streams written by String.getBytes.
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
def __init__(self, use_unicode=True):
|
2014-09-11 14:50:36 -04:00
|
|
|
self.use_unicode = use_unicode
|
|
|
|
|
2013-11-10 20:48:27 -05:00
|
|
|
def loads(self, stream):
|
2014-01-28 22:50:26 -05:00
|
|
|
length = read_int(stream)
|
2014-09-13 19:22:04 -04:00
|
|
|
if length == SpecialLengths.END_OF_DATA_SECTION:
|
|
|
|
raise EOFError
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
elif length == SpecialLengths.NULL:
|
|
|
|
return None
|
2014-09-11 14:50:36 -04:00
|
|
|
s = stream.read(length)
|
|
|
|
return s.decode("utf-8") if self.use_unicode else s
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
def load_stream(self, stream):
|
2014-09-11 14:50:36 -04:00
|
|
|
try:
|
|
|
|
while True:
|
2013-11-10 20:48:27 -05:00
|
|
|
yield self.loads(stream)
|
2014-09-11 14:50:36 -04:00
|
|
|
except struct.error:
|
|
|
|
return
|
|
|
|
except EOFError:
|
|
|
|
return
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2015-04-16 19:20:57 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return "UTF8Deserializer(%s)" % self.use_unicode
|
2014-12-16 01:58:26 -05:00
|
|
|
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2012-10-19 13:24:49 -04:00
|
|
|
def read_long(stream):
|
|
|
|
length = stream.read(8)
|
2015-04-16 19:20:57 -04:00
|
|
|
if not length:
|
2012-10-19 13:24:49 -04:00
|
|
|
raise EOFError
|
|
|
|
return struct.unpack("!q", length)[0]
|
|
|
|
|
|
|
|
|
2013-03-10 16:54:46 -04:00
|
|
|
def write_long(value, stream):
|
|
|
|
stream.write(struct.pack("!q", value))
|
|
|
|
|
|
|
|
|
2013-10-04 14:56:47 -04:00
|
|
|
def pack_long(value):
|
|
|
|
return struct.pack("!q", value)
|
|
|
|
|
|
|
|
|
2012-10-19 13:24:49 -04:00
|
|
|
def read_int(stream):
|
|
|
|
length = stream.read(4)
|
2015-04-16 19:20:57 -04:00
|
|
|
if not length:
|
2012-10-19 13:24:49 -04:00
|
|
|
raise EOFError
|
|
|
|
return struct.unpack("!i", length)[0]
|
|
|
|
|
2013-01-20 04:57:44 -05:00
|
|
|
|
|
|
|
def write_int(value, stream):
|
|
|
|
stream.write(struct.pack("!i", value))
|
|
|
|
|
|
|
|
|
2012-08-25 19:46:07 -04:00
|
|
|
def write_with_length(obj, stream):
|
2013-01-20 04:57:44 -05:00
|
|
|
write_int(len(obj), stream)
|
2013-12-19 01:29:51 -05:00
|
|
|
stream.write(obj)
|
2014-11-18 19:17:51 -05:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
import doctest
|
|
|
|
doctest.testmod()
|