2013-07-16 20:21:33 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
PySpark supports custom serializers for transferring data; this can improve
|
|
|
|
performance.
|
|
|
|
|
|
|
|
By default, PySpark uses L{PickleSerializer} to serialize objects using Python's
|
|
|
|
C{cPickle} serializer, which can serialize nearly any Python object.
|
|
|
|
Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be
|
|
|
|
faster.
|
|
|
|
|
|
|
|
The serializer is chosen when creating L{SparkContext}:
|
|
|
|
|
|
|
|
>>> from pyspark.context import SparkContext
|
|
|
|
>>> from pyspark.serializers import MarshalSerializer
|
|
|
|
>>> sc = SparkContext('local', 'test', serializer=MarshalSerializer())
|
|
|
|
>>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)
|
|
|
|
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
|
|
|
|
>>> sc.stop()
|
|
|
|
|
2014-11-04 02:56:14 -05:00
|
|
|
PySpark serialize objects in batches; By default, the batch size is chosen based
|
|
|
|
on the size of objects, also configurable by SparkContext's C{batchSize} parameter:
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
>>> sc = SparkContext('local', 'test', batchSize=2)
|
|
|
|
>>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
|
|
|
|
|
|
|
|
Behind the scenes, this creates a JavaRDD with four partitions, each of
|
|
|
|
which contains two batches of two objects:
|
|
|
|
|
|
|
|
>>> rdd.glom().collect()
|
|
|
|
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
|
|
|
|
>>> rdd._jrdd.count()
|
|
|
|
8L
|
|
|
|
>>> sc.stop()
|
|
|
|
"""
|
|
|
|
|
2012-08-25 19:46:07 -04:00
|
|
|
import cPickle
|
2013-11-05 20:52:39 -05:00
|
|
|
from itertools import chain, izip, product
|
|
|
|
import marshal
|
|
|
|
import struct
|
2014-04-05 23:52:05 -04:00
|
|
|
import sys
|
2014-08-04 15:13:41 -04:00
|
|
|
import types
|
|
|
|
import collections
|
2014-08-16 19:59:34 -04:00
|
|
|
import zlib
|
2014-09-19 18:01:11 -04:00
|
|
|
import itertools
|
2014-08-04 15:13:41 -04:00
|
|
|
|
2013-11-10 15:58:28 -05:00
|
|
|
from pyspark import cloudpickle
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
|
2014-11-04 02:56:14 -05:00
|
|
|
__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"]
|
2012-08-10 04:10:02 -04:00
|
|
|
|
|
|
|
|
2013-11-03 00:13:18 -04:00
|
|
|
class SpecialLengths(object):
|
|
|
|
END_OF_DATA_SECTION = -1
|
|
|
|
PYTHON_EXCEPTION_THROWN = -2
|
|
|
|
TIMING_DATA = -3
|
2014-10-23 20:20:00 -04:00
|
|
|
END_OF_STREAM = -4
|
2013-11-03 00:13:18 -04:00
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class Serializer(object):
|
|
|
|
|
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
"""
|
|
|
|
Serialize an iterator of objects to the output stream.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
"""
|
|
|
|
Return an iterator of deserialized objects from the input stream.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def _load_stream_without_unbatching(self, stream):
|
|
|
|
return self.load_stream(stream)
|
|
|
|
|
|
|
|
# Note: our notion of "equality" is that output generated by
|
|
|
|
# equal serializers can be deserialized using the same serializer.
|
|
|
|
|
|
|
|
# This default implementation handles the simple cases;
|
|
|
|
# subclasses should override __eq__ as appropriate.
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
return isinstance(other, self.__class__)
|
|
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
|
return not self.__eq__(other)
|
|
|
|
|
2014-09-16 15:51:58 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "%s()" % self.__class__.__name__
|
2014-09-16 15:51:58 -04:00
|
|
|
|
[SPARK-2377] Python API for Streaming
This patch brings Python API for Streaming.
This patch is based on work from @giwa
Author: giwa <ugw.gi.world@gmail.com>
Author: Ken Takagiwa <ken@Kens-MacBook-Pro.local>
Author: Davies Liu <davies.liu@gmail.com>
Author: Ken Takagiwa <ken@kens-mbp.gateway.sonic.net>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Ken <ugw.gi.world@gmail.com>
Author: Ken Takagiwa <ugw.gi.world@gmail.com>
Author: Matthew Farrellee <matt@redhat.com>
Closes #2538 from davies/streaming and squashes the following commits:
64561e4 [Davies Liu] fix tests
331ecce [Davies Liu] fix example
3e2492b [Davies Liu] change updateStateByKey() to easy API
182be73 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
02d0575 [Davies Liu] add wrapper for foreachRDD()
bebeb4a [Davies Liu] address all comments
6db00da [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
8380064 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
52c535b [Davies Liu] remove fix for sum()
e108ec1 [Davies Liu] address comments
37fe06f [Davies Liu] use random port for callback server
d05871e [Davies Liu] remove reuse of PythonRDD
be5e5ff [Davies Liu] merge branch of env, make tests stable.
8071541 [Davies Liu] Merge branch 'env' into streaming
c7bbbce [Davies Liu] fix sphinx docs
6bb9d91 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
54bd92b [Davies Liu] improve tests
c2b31cb [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
7a88f9f [Davies Liu] rollback RDD.setContext(), use textFileStream() to test checkpointing
bd8a4c2 [Davies Liu] fix scala style
7797c70 [Davies Liu] refactor
ff88bec [Davies Liu] rename RDDFunction to TransformFunction
d328aca [Davies Liu] fix serializer in queueStream
6f0da2f [Davies Liu] recover from checkpoint
fa7261b [Davies Liu] refactor
a13ff34 [Davies Liu] address comments
8466916 [Davies Liu] support checkpoint
9a16bd1 [Davies Liu] change number of partitions during tests
b98d63f [Davies Liu] change private[spark] to private[python]
eed6e2a [Davies Liu] rollback not needed changes
e00136b [Davies Liu] address comments
069a94c [Davies Liu] fix the number of partitions during window()
338580a [Davies Liu] change _first(), _take(), _collect() as private API
19797f9 [Davies Liu] clean up
6ebceca [Davies Liu] add more tests
c40c52d [Davies Liu] change first(), take(n) to has the same behavior as RDD
98ac6c2 [Davies Liu] support ssc.transform()
b983f0f [Davies Liu] address comments
847f9b9 [Davies Liu] add more docs, add first(), take()
e059ca2 [Davies Liu] move check of window into Python
fce0ef5 [Davies Liu] rafactor of foreachRDD()
7001b51 [Davies Liu] refactor of queueStream()
26ea396 [Davies Liu] refactor
74df565 [Davies Liu] fix print and docs
b32774c [Davies Liu] move java_import into streaming
604323f [Davies Liu] enable streaming tests
c499ba0 [Davies Liu] remove Time and Duration
3f0fb4b [Davies Liu] refactor fix tests
c28f520 [Davies Liu] support updateStateByKey
d357b70 [Davies Liu] support windowed dstream
bd13026 [Davies Liu] fix examples
eec401e [Davies Liu] refactor, combine TransformedRDD, fix reuse PythonRDD, fix union
9a57685 [Davies Liu] fix python style
bd27874 [Davies Liu] fix scala style
7339be0 [Davies Liu] delete tests
7f53086 [Davies Liu] support transform(), refactor and cleanup
df098fc [Davies Liu] Merge branch 'master' into giwa
550dfd9 [giwa] WIP fixing 1.1 merge
5cdb6fa [giwa] changed for SCCallSiteSync
e685853 [giwa] meged with rebased 1.1 branch
2d32a74 [giwa] added some StreamingContextTestSuite
4a59e1e [giwa] WIP:added more test for StreamingContext
8ffdbf1 [giwa] added atexit to handle callback server
d5f5fcb [giwa] added comment for StreamingContext.sparkContext
63c881a [giwa] added StreamingContext.sparkContext
d39f102 [giwa] added StreamingContext.remember
d542743 [giwa] clean up code
2fdf0de [Matthew Farrellee] Fix scalastyle errors
c0a06bc [giwa] delete not implemented functions
f385976 [giwa] delete inproper comments
b0f2015 [giwa] added comment in dstream._test_output
bebb3f3 [giwa] remove the last brank line
fbed8da [giwa] revert pom.xml
8ed93af [giwa] fixed explanaiton
066ba90 [giwa] revert pom.xml
fa4af88 [giwa] remove duplicated import
6ae3caa [giwa] revert pom.xml
7dc7391 [giwa] fixed typo
62dc7a3 [giwa] clean up exmples
f04882c [giwa] clen up examples
b171ec3 [giwa] fixed pep8 violation
f198d14 [giwa] clean up code
3166d31 [giwa] clean up
c00e091 [giwa] change test case not to use awaitTermination
e80647e [giwa] adopted the latest compression way of python command
58e41ff [giwa] merge with master
455e5af [giwa] removed wasted print in DStream
af336b7 [giwa] add comments
ddd4ee1 [giwa] added TODO coments
99ce042 [giwa] added saveAsTextFiles and saveAsPickledFiles
2a06cdb [giwa] remove waste duplicated code
c5ecfc1 [giwa] basic function test cases are passed
8dcda84 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
795b2cd [giwa] broke something
1e126bf [giwa] WIP: solved partitioned and None is not recognized
f67cf57 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
953deb0 [giwa] edited the comment to add more precise description
af610d3 [giwa] removed unnesessary changes
c1d546e [giwa] fixed PEP-008 violation
99410be [giwa] delete waste file
b3b0362 [giwa] added basic operation test cases
9cde7c9 [giwa] WIP added test case
bd3ba53 [giwa] WIP
5c04a5f [giwa] WIP: added PythonTestInputStream
019ef38 [giwa] WIP
1934726 [giwa] update comment
376e3ac [giwa] WIP
932372a [giwa] clean up dstream.py
0b09cff [giwa] added stop in StreamingContext
92e333e [giwa] implemented reduce and count function in Dstream
1b83354 [giwa] Removed the waste line
88f7506 [Ken Takagiwa] Kill py4j callback server properly
54b5358 [Ken Takagiwa] tried to restart callback server
4f07163 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
fe02547 [Ken Takagiwa] remove waste file
2ad7bd3 [Ken Takagiwa] clean up codes
6197a11 [Ken Takagiwa] clean up code
eb4bf48 [Ken Takagiwa] fix map function
98c2a00 [Ken Takagiwa] added count operation but this implementation need double check
58591d2 [Ken Takagiwa] reduceByKey is working
0df7111 [Ken Takagiwa] delete old file
f485b1d [Ken Takagiwa] fied input of socketTextDStream
dd6de81 [Ken Takagiwa] initial commit for socketTextStream
247fd74 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
4bcb318 [Ken Takagiwa] implementing transform function in Python
38adf95 [Ken Takagiwa] added reducedByKey not working yet
66fcfff [Ken Takagiwa] modify dstream.py to fix indent error
41886c2 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
0b99bec [Ken] initial commit for pySparkStreaming
c214199 [giwa] added testcase for combineByKey
5625bdc [giwa] added gorupByKey testcase
10ab87b [giwa] added sparkContext as input parameter in StreamingContext
10b5b04 [giwa] removed wasted print in DStream
e54f986 [giwa] add comments
16aa64f [giwa] added TODO coments
74535d4 [giwa] added saveAsTextFiles and saveAsPickledFiles
f76c182 [giwa] remove waste duplicated code
18c8723 [giwa] modified streaming test case to add coment
13fb44c [giwa] basic function test cases are passed
3000b2b [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
ff14070 [giwa] broke something
bcdec33 [giwa] WIP: solved partitioned and None is not recognized
270a9e1 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
bb10956 [giwa] edited the comment to add more precise description
253a863 [giwa] removed unnesessary changes
3d37822 [giwa] fixed PEP-008 violation
f21cab3 [giwa] delete waste file
878bad7 [giwa] added basic operation test cases
ce2acd2 [giwa] WIP added test case
9ad6855 [giwa] WIP
1df77f5 [giwa] WIP: added PythonTestInputStream
1523b66 [giwa] WIP
8a0fbbc [giwa] update comment
fe648e3 [giwa] WIP
29c2bc5 [giwa] initial commit for testcase
4d40d63 [giwa] clean up dstream.py
c462bb3 [giwa] added stop in StreamingContext
d2c01ba [giwa] clean up examples
3c45cd2 [giwa] implemented reduce and count function in Dstream
b349649 [giwa] Removed the waste line
3b498e1 [Ken Takagiwa] Kill py4j callback server properly
84a9668 [Ken Takagiwa] tried to restart callback server
9ab8952 [Tathagata Das] Added extra line.
05e991b [Tathagata Das] Added missing file
b1d2a30 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
678e854 [Ken Takagiwa] remove waste file
0a8bbbb [Ken Takagiwa] clean up codes
bab31c1 [Ken Takagiwa] clean up code
72b9738 [Ken Takagiwa] fix map function
d3ee86a [Ken Takagiwa] added count operation but this implementation need double check
15feea9 [Ken Takagiwa] edit python sparkstreaming example
6f98e50 [Ken Takagiwa] reduceByKey is working
c455c8d [Ken Takagiwa] added reducedByKey not working yet
dc6995d [Ken Takagiwa] delete old file
b31446a [Ken Takagiwa] fixed typo of network_workdcount.py
ccfd214 [Ken Takagiwa] added doctest for pyspark.streaming.duration
0d1b954 [Ken Takagiwa] fied input of socketTextDStream
f746109 [Ken Takagiwa] initial commit for socketTextStream
bb7ccf3 [Ken Takagiwa] remove unused import in python
224fc5e [Ken Takagiwa] add empty line
d2099d8 [Ken Takagiwa] sorted the import following Spark coding convention
5bac7ec [Ken Takagiwa] revert streaming/pom.xml
e1df940 [Ken Takagiwa] revert pom.xml
494cae5 [Ken Takagiwa] remove not implemented DStream functions in python
17a74c6 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
1a0f065 [Ken Takagiwa] implementing transform function in Python
d7b4d6f [Ken Takagiwa] added reducedByKey not working yet
87438e2 [Ken Takagiwa] modify dstream.py to fix indent error
b406252 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
454981d [Ken] initial commit for pySparkStreaming
150b94c [giwa] added some StreamingContextTestSuite
f7bc8f9 [giwa] WIP:added more test for StreamingContext
ee50c5a [giwa] added atexit to handle callback server
fdc9125 [giwa] added comment for StreamingContext.sparkContext
f5bfb70 [giwa] added StreamingContext.sparkContext
da09768 [giwa] added StreamingContext.remember
d68b568 [giwa] clean up code
4afa390 [giwa] clean up code
1fd6bc7 [Ken Takagiwa] Merge pull request #2 from mattf/giwa-master
d9d59fe [Matthew Farrellee] Fix scalastyle errors
67473a9 [giwa] delete not implemented functions
c97377c [giwa] delete inproper comments
2ea769e [giwa] added comment in dstream._test_output
3b27bd4 [giwa] remove the last brank line
acfcaeb [giwa] revert pom.xml
93f7637 [giwa] fixed explanaiton
50fd6f9 [giwa] revert pom.xml
4f82c89 [giwa] remove duplicated import
9d1de23 [giwa] revert pom.xml
7339df2 [giwa] fixed typo
9c85e48 [giwa] clean up exmples
24f95db [giwa] clen up examples
0d30109 [giwa] fixed pep8 violation
b7dab85 [giwa] improve test case
583e66d [giwa] move tests for streaming inside streaming directory
1d84142 [giwa] remove unimplement test
f0ea311 [giwa] clean up code
171edeb [giwa] clean up
4dedd2d [giwa] change test case not to use awaitTermination
268a6a5 [giwa] Changed awaitTermination not to call awaitTermincation in Scala. Just use time.sleep instread
09a28bf [giwa] improve testcases
58150f5 [giwa] Changed the test case to focus the test operation
199e37f [giwa] adopted the latest compression way of python command
185fdbf [giwa] merge with master
f1798c4 [giwa] merge with master
e70f706 [giwa] added testcase for combineByKey
e162822 [giwa] added gorupByKey testcase
97742fe [giwa] added sparkContext as input parameter in StreamingContext
14d4c0e [giwa] removed wasted print in DStream
6d8190a [giwa] add comments
4aa99e4 [giwa] added TODO coments
e9fab72 [giwa] added saveAsTextFiles and saveAsPickledFiles
94f2b65 [giwa] remove waste duplicated code
580fbc2 [giwa] modified streaming test case to add coment
99e4bb3 [giwa] basic function test cases are passed
7051a84 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
35933e1 [giwa] broke something
9767712 [giwa] WIP: solved partitioned and None is not recognized
4f2d7e6 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
33c0f94d [giwa] edited the comment to add more precise description
774f18d [giwa] removed unnesessary changes
3a671cc [giwa] remove export PYSPARK_PYTHON in spark submit
8efa266 [giwa] fixed PEP-008 violation
fa75d71 [giwa] delete waste file
7f96294 [giwa] added basic operation test cases
3dda31a [giwa] WIP added test case
1f68b78 [giwa] WIP
c05922c [giwa] WIP: added PythonTestInputStream
1fd12ae [giwa] WIP
c880a33 [giwa] update comment
5d22c92 [giwa] WIP
ea4b06b [giwa] initial commit for testcase
5a9b525 [giwa] clean up dstream.py
79c5809 [giwa] added stop in StreamingContext
189dcea [giwa] clean up examples
b8d7d24 [giwa] implemented reduce and count function in Dstream
b6468e6 [giwa] Removed the waste line
b47b5fd [Ken Takagiwa] Kill py4j callback server properly
19ddcdd [Ken Takagiwa] tried to restart callback server
c9fc124 [Tathagata Das] Added extra line.
4caae3f [Tathagata Das] Added missing file
4eff053 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
5e822d4 [Ken Takagiwa] remove waste file
aeaf8a5 [Ken Takagiwa] clean up codes
9fa249b [Ken Takagiwa] clean up code
05459c6 [Ken Takagiwa] fix map function
a9f4ecb [Ken Takagiwa] added count operation but this implementation need double check
d1ee6ca [Ken Takagiwa] edit python sparkstreaming example
0b8b7d0 [Ken Takagiwa] reduceByKey is working
d25d5cf [Ken Takagiwa] added reducedByKey not working yet
7f7c5d1 [Ken Takagiwa] delete old file
967dc26 [Ken Takagiwa] fixed typo of network_workdcount.py
57fb740 [Ken Takagiwa] added doctest for pyspark.streaming.duration
4b69fb1 [Ken Takagiwa] fied input of socketTextDStream
02f618a [Ken Takagiwa] initial commit for socketTextStream
4ce4058 [Ken Takagiwa] remove unused import in python
856d98e [Ken Takagiwa] add empty line
490e338 [Ken Takagiwa] sorted the import following Spark coding convention
5594bd4 [Ken Takagiwa] revert pom.xml
2adca84 [Ken Takagiwa] remove not implemented DStream functions in python
e551e13 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
3758175 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
c5518b4 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
dcf243f [Ken Takagiwa] implementing transform function in Python
9af03f4 [Ken Takagiwa] added reducedByKey not working yet
6e0d9c7 [Ken Takagiwa] modify dstream.py to fix indent error
e497b9b [Ken Takagiwa] comment PythonDStream.PairwiseDStream
5c3a683 [Ken] initial commit for pySparkStreaming
665bfdb [giwa] added testcase for combineByKey
a3d2379 [giwa] added gorupByKey testcase
636090a [giwa] added sparkContext as input parameter in StreamingContext
e7ebb08 [giwa] removed wasted print in DStream
d8b593b [giwa] add comments
ea9c873 [giwa] added TODO coments
89ae38a [giwa] added saveAsTextFiles and saveAsPickledFiles
e3033fc [giwa] remove waste duplicated code
a14c7e1 [giwa] modified streaming test case to add coment
536def4 [giwa] basic function test cases are passed
2112638 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
080541a [giwa] broke something
0704b86 [giwa] WIP: solved partitioned and None is not recognized
90a6484 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
a65f302 [giwa] edited the comment to add more precise description
bdde697 [giwa] removed unnesessary changes
e8c7bfc [giwa] remove export PYSPARK_PYTHON in spark submit
3334169 [giwa] fixed PEP-008 violation
db0a303 [giwa] delete waste file
2cfd3a0 [giwa] added basic operation test cases
90ae568 [giwa] WIP added test case
a120d07 [giwa] WIP
f671cdb [giwa] WIP: added PythonTestInputStream
56fae45 [giwa] WIP
e35e101 [giwa] Merge branch 'master' into testcase
ba5112d [giwa] update comment
28aa56d [giwa] WIP
fb08559 [giwa] initial commit for testcase
a613b85 [giwa] clean up dstream.py
c40c0ef [giwa] added stop in StreamingContext
31e4260 [giwa] clean up examples
d2127d6 [giwa] implemented reduce and count function in Dstream
48f7746 [giwa] Removed the waste line
0f83eaa [Ken Takagiwa] delete py4j 0.8.1
1679808 [Ken Takagiwa] Kill py4j callback server properly
f96cd4e [Ken Takagiwa] tried to restart callback server
fe86198 [Ken Takagiwa] add py4j 0.8.2.1 but server is not launched
1064fe0 [Ken Takagiwa] Merge branch 'master' of https://github.com/giwa/spark
28c6620 [Ken Takagiwa] Implemented DStream.foreachRDD in the Python API using Py4J callback server
85b0fe1 [Ken Takagiwa] Merge pull request #1 from tdas/python-foreach
54e2e8c [Tathagata Das] Added extra line.
e185338 [Tathagata Das] Added missing file
a778d4b [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
cc2092b [Ken Takagiwa] remove waste file
d042ac6 [Ken Takagiwa] clean up codes
84a021f [Ken Takagiwa] clean up code
bd20e17 [Ken Takagiwa] fix map function
d01a125 [Ken Takagiwa] added count operation but this implementation need double check
7d05109 [Ken Takagiwa] merge with remote branch
ae464e0 [Ken Takagiwa] edit python sparkstreaming example
04af046 [Ken Takagiwa] reduceByKey is working
3b6d7b0 [Ken Takagiwa] implementing transform function in Python
571d52d [Ken Takagiwa] added reducedByKey not working yet
5720979 [Ken Takagiwa] delete old file
e604fcb [Ken Takagiwa] fixed typo of network_workdcount.py
4b7c08b [Ken Takagiwa] Merge branch 'master' of https://github.com/giwa/spark
ce7d426 [Ken Takagiwa] added doctest for pyspark.streaming.duration
a8c9fd5 [Ken Takagiwa] fixed for socketTextStream
a61fa9e [Ken Takagiwa] fied input of socketTextDStream
1e84f41 [Ken Takagiwa] initial commit for socketTextStream
6d012f7 [Ken Takagiwa] remove unused import in python
25d30d5 [Ken Takagiwa] add empty line
6e0a64a [Ken Takagiwa] sorted the import following Spark coding convention
fa4a7fc [Ken Takagiwa] revert streaming/pom.xml
8f8202b [Ken Takagiwa] revert streaming pom.xml
c9d79dd [Ken Takagiwa] revert pom.xml
57e3e52 [Ken Takagiwa] remove not implemented DStream functions in python
0a516f5 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
a7a0b5c [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
72bfc66 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
69e9cd3 [Ken Takagiwa] implementing transform function in Python
94a0787 [Ken Takagiwa] added reducedByKey not working yet
88068cf [Ken Takagiwa] modify dstream.py to fix indent error
1367be5 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
eb2b3ba [Ken] Merge remote-tracking branch 'upstream/master'
d8e51f9 [Ken] initial commit for pySparkStreaming
2014-10-12 05:46:56 -04:00
|
|
|
def __hash__(self):
|
|
|
|
return hash(str(self))
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
class FramedSerializer(Serializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serializer that writes objects as a stream of (length, data) pairs,
|
|
|
|
where C{length} is a 32-bit integer and data is C{length} bytes.
|
|
|
|
"""
|
|
|
|
|
2014-04-05 23:52:05 -04:00
|
|
|
def __init__(self):
|
|
|
|
# On Python 2.6, we can't write bytearrays to streams, so we need to convert them
|
|
|
|
# to strings first. Check if the version number is that old.
|
|
|
|
self._only_write_strings = sys.version_info[0:2] <= (2, 6)
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
for obj in iterator:
|
|
|
|
self._write_with_length(obj, stream)
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
yield self._read_with_length(stream)
|
|
|
|
except EOFError:
|
|
|
|
return
|
|
|
|
|
|
|
|
def _write_with_length(self, obj, stream):
|
2013-11-10 20:48:27 -05:00
|
|
|
serialized = self.dumps(obj)
|
2014-11-18 19:17:51 -05:00
|
|
|
if len(serialized) > (1 << 31):
|
|
|
|
raise ValueError("can not serialize object larger than 2G")
|
2013-11-05 20:52:39 -05:00
|
|
|
write_int(len(serialized), stream)
|
2014-04-05 23:52:05 -04:00
|
|
|
if self._only_write_strings:
|
|
|
|
stream.write(str(serialized))
|
|
|
|
else:
|
|
|
|
stream.write(serialized)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
def _read_with_length(self, stream):
|
|
|
|
length = read_int(stream)
|
2014-09-13 19:22:04 -04:00
|
|
|
if length == SpecialLengths.END_OF_DATA_SECTION:
|
|
|
|
raise EOFError
|
2013-11-05 20:52:39 -05:00
|
|
|
obj = stream.read(length)
|
|
|
|
if obj == "":
|
|
|
|
raise EOFError
|
2013-11-10 20:48:27 -05:00
|
|
|
return self.loads(obj)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2013-11-10 20:48:27 -05:00
|
|
|
def dumps(self, obj):
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serialize an object into a byte array.
|
|
|
|
When batching is used, this will be called with an array of objects.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
2013-11-10 20:48:27 -05:00
|
|
|
def loads(self, obj):
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Deserialize an object from a byte array.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
|
|
class BatchedSerializer(Serializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serializes a stream of objects in batches by calling its wrapped
|
|
|
|
Serializer with streams of objects.
|
|
|
|
"""
|
|
|
|
|
|
|
|
UNLIMITED_BATCH_SIZE = -1
|
2014-11-04 02:56:14 -05:00
|
|
|
UNKNOWN_BATCH_SIZE = 0
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
|
|
|
|
self.serializer = serializer
|
|
|
|
self.batchSize = batchSize
|
|
|
|
|
|
|
|
def _batched(self, iterator):
|
|
|
|
if self.batchSize == self.UNLIMITED_BATCH_SIZE:
|
|
|
|
yield list(iterator)
|
2015-01-15 14:40:41 -05:00
|
|
|
elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
|
|
|
|
n = len(iterator)
|
|
|
|
for i in xrange(0, n, self.batchSize):
|
|
|
|
yield iterator[i: i + self.batchSize]
|
2013-11-05 20:52:39 -05:00
|
|
|
else:
|
|
|
|
items = []
|
|
|
|
count = 0
|
|
|
|
for item in iterator:
|
|
|
|
items.append(item)
|
|
|
|
count += 1
|
|
|
|
if count == self.batchSize:
|
|
|
|
yield items
|
|
|
|
items = []
|
|
|
|
count = 0
|
|
|
|
if items:
|
|
|
|
yield items
|
|
|
|
|
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
self.serializer.dump_stream(self._batched(iterator), stream)
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
return chain.from_iterable(self._load_stream_without_unbatching(stream))
|
|
|
|
|
|
|
|
def _load_stream_without_unbatching(self, stream):
|
2014-07-25 01:53:47 -04:00
|
|
|
return self.serializer.load_stream(stream)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
def __eq__(self, other):
|
2014-07-22 01:30:53 -04:00
|
|
|
return (isinstance(other, BatchedSerializer) and
|
2014-11-04 02:56:14 -05:00
|
|
|
other.serializer == self.serializer and other.batchSize == self.batchSize)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2014-10-06 17:05:45 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
|
2014-09-19 18:01:11 -04:00
|
|
|
class AutoBatchedSerializer(BatchedSerializer):
|
|
|
|
"""
|
|
|
|
Choose the size of batch automatically based on the size of object
|
|
|
|
"""
|
|
|
|
|
2014-10-10 17:14:05 -04:00
|
|
|
def __init__(self, serializer, bestSize=1 << 16):
|
2014-11-04 02:56:14 -05:00
|
|
|
BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE)
|
2014-09-19 18:01:11 -04:00
|
|
|
self.bestSize = bestSize
|
|
|
|
|
|
|
|
def dump_stream(self, iterator, stream):
|
|
|
|
batch, best = 1, self.bestSize
|
|
|
|
iterator = iter(iterator)
|
|
|
|
while True:
|
|
|
|
vs = list(itertools.islice(iterator, batch))
|
|
|
|
if not vs:
|
|
|
|
break
|
|
|
|
|
|
|
|
bytes = self.serializer.dumps(vs)
|
|
|
|
write_int(len(bytes), stream)
|
|
|
|
stream.write(bytes)
|
|
|
|
|
|
|
|
size = len(bytes)
|
|
|
|
if size < best:
|
|
|
|
batch *= 2
|
|
|
|
elif size > best * 10 and batch > 1:
|
|
|
|
batch /= 2
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
return (isinstance(other, AutoBatchedSerializer) and
|
2014-11-04 02:56:14 -05:00
|
|
|
other.serializer == self.serializer and other.bestSize == self.bestSize)
|
2014-09-19 18:01:11 -04:00
|
|
|
|
|
|
|
def __str__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "AutoBatchedSerializer(%s)" % str(self.serializer)
|
2014-09-19 18:01:11 -04:00
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class CartesianDeserializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
"""
|
2013-11-05 20:52:39 -05:00
|
|
|
Deserializes the JavaRDD cartesian() of two PythonRDDs.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, key_ser, val_ser):
|
|
|
|
self.key_ser = key_ser
|
|
|
|
self.val_ser = val_ser
|
|
|
|
|
2014-03-10 16:27:00 -04:00
|
|
|
def prepare_keys_values(self, stream):
|
2013-11-05 20:52:39 -05:00
|
|
|
key_stream = self.key_ser._load_stream_without_unbatching(stream)
|
|
|
|
val_stream = self.val_ser._load_stream_without_unbatching(stream)
|
|
|
|
key_is_batched = isinstance(self.key_ser, BatchedSerializer)
|
|
|
|
val_is_batched = isinstance(self.val_ser, BatchedSerializer)
|
|
|
|
for (keys, vals) in izip(key_stream, val_stream):
|
|
|
|
keys = keys if key_is_batched else [keys]
|
|
|
|
vals = vals if val_is_batched else [vals]
|
2014-03-10 16:27:00 -04:00
|
|
|
yield (keys, vals)
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
for (keys, vals) in self.prepare_keys_values(stream):
|
2013-11-05 20:52:39 -05:00
|
|
|
for pair in product(keys, vals):
|
|
|
|
yield pair
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
2014-07-22 01:30:53 -04:00
|
|
|
return (isinstance(other, CartesianDeserializer) and
|
|
|
|
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2014-10-06 17:05:45 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "CartesianDeserializer(%s, %s)" % \
|
2013-11-05 20:52:39 -05:00
|
|
|
(str(self.key_ser), str(self.val_ser))
|
|
|
|
|
|
|
|
|
2014-03-10 16:27:00 -04:00
|
|
|
class PairDeserializer(CartesianDeserializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-03-10 16:27:00 -04:00
|
|
|
"""
|
|
|
|
Deserializes the JavaRDD zip() of two PythonRDDs.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, key_ser, val_ser):
|
|
|
|
self.key_ser = key_ser
|
|
|
|
self.val_ser = val_ser
|
|
|
|
|
|
|
|
def load_stream(self, stream):
|
|
|
|
for (keys, vals) in self.prepare_keys_values(stream):
|
2014-08-19 17:46:32 -04:00
|
|
|
if len(keys) != len(vals):
|
|
|
|
raise ValueError("Can not deserialize RDD with different number of items"
|
|
|
|
" in pair: (%d, %d)" % (len(keys), len(vals)))
|
2014-03-10 16:27:00 -04:00
|
|
|
for pair in izip(keys, vals):
|
|
|
|
yield pair
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
2014-07-22 01:30:53 -04:00
|
|
|
return (isinstance(other, PairDeserializer) and
|
|
|
|
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
|
2014-03-10 16:27:00 -04:00
|
|
|
|
2014-10-06 17:05:45 -04:00
|
|
|
def __repr__(self):
|
2014-11-04 02:56:14 -05:00
|
|
|
return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
|
2014-03-10 16:27:00 -04:00
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class NoOpSerializer(FramedSerializer):
|
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
def loads(self, obj):
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def dumps(self, obj):
|
|
|
|
return obj
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
|
2014-08-04 15:13:41 -04:00
|
|
|
# Hook namedtuple, make it picklable
|
|
|
|
|
|
|
|
__cls = {}
|
|
|
|
|
|
|
|
|
|
|
|
def _restore(name, fields, value):
|
|
|
|
""" Restore an object of namedtuple"""
|
|
|
|
k = (name, fields)
|
|
|
|
cls = __cls.get(k)
|
|
|
|
if cls is None:
|
|
|
|
cls = collections.namedtuple(name, fields)
|
|
|
|
__cls[k] = cls
|
|
|
|
return cls(*value)
|
|
|
|
|
|
|
|
|
|
|
|
def _hack_namedtuple(cls):
|
|
|
|
""" Make class generated by namedtuple picklable """
|
|
|
|
name = cls.__name__
|
|
|
|
fields = cls._fields
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-08-04 15:13:41 -04:00
|
|
|
def __reduce__(self):
|
|
|
|
return (_restore, (name, fields, tuple(self)))
|
|
|
|
cls.__reduce__ = __reduce__
|
|
|
|
return cls
|
|
|
|
|
|
|
|
|
|
|
|
def _hijack_namedtuple():
|
|
|
|
""" Hack namedtuple() to make it picklable """
|
2014-08-04 18:54:52 -04:00
|
|
|
# hijack only one time
|
|
|
|
if hasattr(collections.namedtuple, "__hijack"):
|
|
|
|
return
|
2014-08-04 15:13:41 -04:00
|
|
|
|
2014-08-06 15:58:24 -04:00
|
|
|
global _old_namedtuple # or it will put in closure
|
|
|
|
|
2014-08-04 15:13:41 -04:00
|
|
|
def _copy_func(f):
|
|
|
|
return types.FunctionType(f.func_code, f.func_globals, f.func_name,
|
2014-08-06 15:58:24 -04:00
|
|
|
f.func_defaults, f.func_closure)
|
2014-08-04 15:13:41 -04:00
|
|
|
|
|
|
|
_old_namedtuple = _copy_func(collections.namedtuple)
|
|
|
|
|
2014-08-11 14:54:09 -04:00
|
|
|
def namedtuple(*args, **kwargs):
|
|
|
|
cls = _old_namedtuple(*args, **kwargs)
|
2014-08-04 15:13:41 -04:00
|
|
|
return _hack_namedtuple(cls)
|
|
|
|
|
|
|
|
# replace namedtuple with new one
|
|
|
|
collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
|
|
|
|
collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
|
|
|
|
collections.namedtuple.func_code = namedtuple.func_code
|
2014-08-04 18:54:52 -04:00
|
|
|
collections.namedtuple.__hijack = 1
|
2014-08-04 15:13:41 -04:00
|
|
|
|
|
|
|
# hack the cls already generated by namedtuple
|
|
|
|
# those created in other module can be pickled as normal,
|
|
|
|
# so only hack those in __main__ module
|
|
|
|
for n, o in sys.modules["__main__"].__dict__.iteritems():
|
|
|
|
if (type(o) is type and o.__base__ is tuple
|
2014-08-06 15:58:24 -04:00
|
|
|
and hasattr(o, "_fields")
|
|
|
|
and "__reduce__" not in o.__dict__):
|
|
|
|
_hack_namedtuple(o) # hack inplace
|
2014-08-04 15:13:41 -04:00
|
|
|
|
|
|
|
|
|
|
|
_hijack_namedtuple()
|
|
|
|
|
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class PickleSerializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
|
|
|
Serializes objects using Python's cPickle serializer:
|
|
|
|
|
|
|
|
http://docs.python.org/2/library/pickle.html
|
|
|
|
|
|
|
|
This serializer supports nearly any Python object, but may
|
|
|
|
not be as fast as more specialized serializers.
|
|
|
|
"""
|
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
def dumps(self, obj):
|
|
|
|
return cPickle.dumps(obj, 2)
|
|
|
|
|
2014-09-12 21:42:50 -04:00
|
|
|
def loads(self, obj):
|
|
|
|
return cPickle.loads(obj)
|
2013-11-05 20:52:39 -05:00
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
|
2013-11-10 15:58:28 -05:00
|
|
|
class CloudPickleSerializer(PickleSerializer):
|
|
|
|
|
2014-07-22 01:30:53 -04:00
|
|
|
def dumps(self, obj):
|
|
|
|
return cloudpickle.dumps(obj, 2)
|
2013-11-10 15:58:28 -05:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
class MarshalSerializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
"""
|
2013-11-05 20:52:39 -05:00
|
|
|
Serializes objects using Python's Marshal serializer:
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
http://docs.python.org/2/library/marshal.html
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
This serializer is faster than PickleSerializer but supports fewer datatypes.
|
|
|
|
"""
|
|
|
|
|
2014-09-12 21:42:50 -04:00
|
|
|
def dumps(self, obj):
|
|
|
|
return marshal.dumps(obj)
|
|
|
|
|
|
|
|
def loads(self, obj):
|
|
|
|
return marshal.loads(obj)
|
2014-07-25 01:53:47 -04:00
|
|
|
|
|
|
|
|
|
|
|
class AutoSerializer(FramedSerializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-07-25 01:53:47 -04:00
|
|
|
"""
|
2014-11-04 02:56:14 -05:00
|
|
|
Choose marshal or cPickle as serialization protocol automatically
|
2014-07-25 01:53:47 -04:00
|
|
|
"""
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2014-07-25 01:53:47 -04:00
|
|
|
def __init__(self):
|
|
|
|
FramedSerializer.__init__(self)
|
|
|
|
self._type = None
|
|
|
|
|
|
|
|
def dumps(self, obj):
|
|
|
|
if self._type is not None:
|
|
|
|
return 'P' + cPickle.dumps(obj, -1)
|
|
|
|
try:
|
|
|
|
return 'M' + marshal.dumps(obj)
|
|
|
|
except Exception:
|
|
|
|
self._type = 'P'
|
|
|
|
return 'P' + cPickle.dumps(obj, -1)
|
|
|
|
|
|
|
|
def loads(self, obj):
|
|
|
|
_type = obj[0]
|
|
|
|
if _type == 'M':
|
|
|
|
return marshal.loads(obj[1:])
|
|
|
|
elif _type == 'P':
|
|
|
|
return cPickle.loads(obj[1:])
|
|
|
|
else:
|
|
|
|
raise ValueError("invalid sevialization type: %s" % _type)
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2014-08-16 19:59:34 -04:00
|
|
|
|
2014-11-24 20:17:03 -05:00
|
|
|
class CompressedSerializer(FramedSerializer):
|
2014-11-18 19:17:51 -05:00
|
|
|
"""
|
|
|
|
Compress the serialized data
|
|
|
|
"""
|
2014-08-16 19:59:34 -04:00
|
|
|
def __init__(self, serializer):
|
2014-11-24 20:17:03 -05:00
|
|
|
FramedSerializer.__init__(self)
|
|
|
|
assert isinstance(serializer, FramedSerializer), "serializer must be a FramedSerializer"
|
2014-08-16 19:59:34 -04:00
|
|
|
self.serializer = serializer
|
|
|
|
|
2014-11-24 20:17:03 -05:00
|
|
|
def dumps(self, obj):
|
|
|
|
return zlib.compress(self.serializer.dumps(obj), 1)
|
2014-08-16 19:59:34 -04:00
|
|
|
|
2014-11-24 20:17:03 -05:00
|
|
|
def loads(self, obj):
|
|
|
|
return self.serializer.loads(zlib.decompress(obj))
|
2014-08-16 19:59:34 -04:00
|
|
|
|
2014-12-16 01:58:26 -05:00
|
|
|
def __eq__(self, other):
|
|
|
|
return isinstance(other, CompressedSerializer) and self.serializer == other.serializer
|
|
|
|
|
2012-12-26 20:34:24 -05:00
|
|
|
|
2014-01-28 22:50:26 -05:00
|
|
|
class UTF8Deserializer(Serializer):
|
2014-08-06 15:58:24 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
2014-04-04 20:29:29 -04:00
|
|
|
Deserializes streams written by String.getBytes.
|
2013-11-05 20:52:39 -05:00
|
|
|
"""
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2014-09-11 14:50:36 -04:00
|
|
|
def __init__(self, use_unicode=False):
|
|
|
|
self.use_unicode = use_unicode
|
|
|
|
|
2013-11-10 20:48:27 -05:00
|
|
|
def loads(self, stream):
|
2014-01-28 22:50:26 -05:00
|
|
|
length = read_int(stream)
|
2014-09-13 19:22:04 -04:00
|
|
|
if length == SpecialLengths.END_OF_DATA_SECTION:
|
|
|
|
raise EOFError
|
2014-09-11 14:50:36 -04:00
|
|
|
s = stream.read(length)
|
|
|
|
return s.decode("utf-8") if self.use_unicode else s
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2013-11-05 20:52:39 -05:00
|
|
|
def load_stream(self, stream):
|
2014-09-11 14:50:36 -04:00
|
|
|
try:
|
|
|
|
while True:
|
2013-11-10 20:48:27 -05:00
|
|
|
yield self.loads(stream)
|
2014-09-11 14:50:36 -04:00
|
|
|
except struct.error:
|
|
|
|
return
|
|
|
|
except EOFError:
|
|
|
|
return
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2014-12-16 01:58:26 -05:00
|
|
|
def __eq__(self, other):
|
|
|
|
return isinstance(other, UTF8Deserializer) and self.use_unicode == other.use_unicode
|
|
|
|
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2012-10-19 13:24:49 -04:00
|
|
|
def read_long(stream):
|
|
|
|
length = stream.read(8)
|
|
|
|
if length == "":
|
|
|
|
raise EOFError
|
|
|
|
return struct.unpack("!q", length)[0]
|
|
|
|
|
|
|
|
|
2013-03-10 16:54:46 -04:00
|
|
|
def write_long(value, stream):
|
|
|
|
stream.write(struct.pack("!q", value))
|
|
|
|
|
|
|
|
|
2013-10-04 14:56:47 -04:00
|
|
|
def pack_long(value):
|
|
|
|
return struct.pack("!q", value)
|
|
|
|
|
|
|
|
|
2012-10-19 13:24:49 -04:00
|
|
|
def read_int(stream):
|
|
|
|
length = stream.read(4)
|
|
|
|
if length == "":
|
|
|
|
raise EOFError
|
|
|
|
return struct.unpack("!i", length)[0]
|
|
|
|
|
2013-01-20 04:57:44 -05:00
|
|
|
|
|
|
|
def write_int(value, stream):
|
|
|
|
stream.write(struct.pack("!i", value))
|
|
|
|
|
|
|
|
|
2012-08-25 19:46:07 -04:00
|
|
|
def write_with_length(obj, stream):
|
2013-01-20 04:57:44 -05:00
|
|
|
write_int(len(obj), stream)
|
2013-12-19 01:29:51 -05:00
|
|
|
stream.write(obj)
|
2014-11-18 19:17:51 -05:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
import doctest
|
|
|
|
doctest.testmod()
|