FramedSerializer: _dumps => dumps, _loads => loads.

This commit is contained in:
Josh Rosen 2013-11-10 17:48:27 -08:00
parent ffa5bedf46
commit 13122ceb8c
4 changed files with 18 additions and 18 deletions

View file

@ -251,7 +251,7 @@ class SparkContext(object):
sent to each cluster only once. sent to each cluster only once.
""" """
pickleSer = PickleSerializer() pickleSer = PickleSerializer()
pickled = pickleSer._dumps(value) pickled = pickleSer.dumps(value)
jbroadcast = self._jsc.broadcast(bytearray(pickled)) jbroadcast = self._jsc.broadcast(bytearray(pickled))
return Broadcast(jbroadcast.id(), value, jbroadcast, return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars) self._pickled_broadcast_vars)

View file

@ -751,7 +751,7 @@ class RDD(object):
buckets[partitionFunc(k) % numPartitions].append((k, v)) buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems(): for (split, items) in buckets.iteritems():
yield pack_long(split) yield pack_long(split)
yield outputSerializer._dumps(items) yield outputSerializer.dumps(items)
keyed = PipelinedRDD(self, add_shuffle_key) keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True keyed._bypass_serializer = True
pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
@ -970,7 +970,7 @@ class PipelinedRDD(RDD):
else: else:
serializer = self.ctx.serializer serializer = self.ctx.serializer
command = (self.func, self._prev_jrdd_deserializer, serializer) command = (self.func, self._prev_jrdd_deserializer, serializer)
pickled_command = CloudPickleSerializer()._dumps(command) pickled_command = CloudPickleSerializer().dumps(command)
broadcast_vars = ListConverter().convert( broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars], [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client) self.ctx._gateway._gateway_client)

View file

@ -125,7 +125,7 @@ class FramedSerializer(Serializer):
return return
def _write_with_length(self, obj, stream): def _write_with_length(self, obj, stream):
serialized = self._dumps(obj) serialized = self.dumps(obj)
write_int(len(serialized), stream) write_int(len(serialized), stream)
stream.write(serialized) stream.write(serialized)
@ -134,16 +134,16 @@ class FramedSerializer(Serializer):
obj = stream.read(length) obj = stream.read(length)
if obj == "": if obj == "":
raise EOFError raise EOFError
return self._loads(obj) return self.loads(obj)
def _dumps(self, obj): def dumps(self, obj):
""" """
Serialize an object into a byte array. Serialize an object into a byte array.
When batching is used, this will be called with an array of objects. When batching is used, this will be called with an array of objects.
""" """
raise NotImplementedError raise NotImplementedError
def _loads(self, obj): def loads(self, obj):
""" """
Deserialize an object from a byte array. Deserialize an object from a byte array.
""" """
@ -228,8 +228,8 @@ class CartesianDeserializer(FramedSerializer):
class NoOpSerializer(FramedSerializer): class NoOpSerializer(FramedSerializer):
def _loads(self, obj): return obj def loads(self, obj): return obj
def _dumps(self, obj): return obj def dumps(self, obj): return obj
class PickleSerializer(FramedSerializer): class PickleSerializer(FramedSerializer):
@ -242,12 +242,12 @@ class PickleSerializer(FramedSerializer):
not be as fast as more specialized serializers. not be as fast as more specialized serializers.
""" """
def _dumps(self, obj): return cPickle.dumps(obj, 2) def dumps(self, obj): return cPickle.dumps(obj, 2)
_loads = cPickle.loads loads = cPickle.loads
class CloudPickleSerializer(PickleSerializer): class CloudPickleSerializer(PickleSerializer):
def _dumps(self, obj): return cloudpickle.dumps(obj, 2) def dumps(self, obj): return cloudpickle.dumps(obj, 2)
class MarshalSerializer(FramedSerializer): class MarshalSerializer(FramedSerializer):
@ -259,8 +259,8 @@ class MarshalSerializer(FramedSerializer):
This serializer is faster than PickleSerializer but supports fewer datatypes. This serializer is faster than PickleSerializer but supports fewer datatypes.
""" """
_dumps = marshal.dumps dumps = marshal.dumps
_loads = marshal.loads loads = marshal.loads
class MUTF8Deserializer(Serializer): class MUTF8Deserializer(Serializer):
@ -268,14 +268,14 @@ class MUTF8Deserializer(Serializer):
Deserializes streams written by Java's DataOutputStream.writeUTF(). Deserializes streams written by Java's DataOutputStream.writeUTF().
""" """
def _loads(self, stream): def loads(self, stream):
length = struct.unpack('>H', stream.read(2))[0] length = struct.unpack('>H', stream.read(2))[0]
return stream.read(length).decode('utf8') return stream.read(length).decode('utf8')
def load_stream(self, stream): def load_stream(self, stream):
while True: while True:
try: try:
yield self._loads(stream) yield self.loads(stream)
except struct.error: except struct.error:
return return
except EOFError: except EOFError:

View file

@ -51,7 +51,7 @@ def main(infile, outfile):
return return
# fetch name of workdir # fetch name of workdir
spark_files_dir = mutf8_deserializer._loads(infile) spark_files_dir = mutf8_deserializer.loads(infile)
SparkFiles._root_directory = spark_files_dir SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True SparkFiles._is_running_on_worker = True
@ -66,7 +66,7 @@ def main(infile, outfile):
sys.path.append(spark_files_dir) # *.py files that were added will be copied here sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile) num_python_includes = read_int(infile)
for _ in range(num_python_includes): for _ in range(num_python_includes):
filename = mutf8_deserializer._loads(infile) filename = mutf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename)) sys.path.append(os.path.join(spark_files_dir, filename))
command = pickleSer._read_with_length(infile) command = pickleSer._read_with_length(infile)