diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 119e0459c5..b89effc16d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -316,6 +316,7 @@ private object SpecialLengths { val PYTHON_EXCEPTION_THROWN = -2 val TIMING_DATA = -3 val END_OF_STREAM = -4 + val NULL = -5 } private[spark] object PythonRDD extends Logging { @@ -374,54 +375,25 @@ private[spark] object PythonRDD extends Logging { } def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) { - // The right way to implement this would be to use TypeTags to get the full - // type of T. Since I don't want to introduce breaking changes throughout the - // entire Spark API, I have to use this hacky approach: - if (iter.hasNext) { - val first = iter.next() - val newIter = Seq(first).iterator ++ iter - first match { - case arr: Array[Byte] => - newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes => - dataOut.writeInt(bytes.length) - dataOut.write(bytes) - } - case string: String => - newIter.asInstanceOf[Iterator[String]].foreach { str => - writeUTF(str, dataOut) - } - case stream: PortableDataStream => - newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream => - val bytes = stream.toArray() - dataOut.writeInt(bytes.length) - dataOut.write(bytes) - } - case (key: String, stream: PortableDataStream) => - newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach { - case (key, stream) => - writeUTF(key, dataOut) - val bytes = stream.toArray() - dataOut.writeInt(bytes.length) - dataOut.write(bytes) - } - case (key: String, value: String) => - newIter.asInstanceOf[Iterator[(String, String)]].foreach { - case (key, value) => - writeUTF(key, dataOut) - writeUTF(value, dataOut) - } - case (key: Array[Byte], value: Array[Byte]) => - newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach { - case (key, value) => - dataOut.writeInt(key.length) - dataOut.write(key) - dataOut.writeInt(value.length) - dataOut.write(value) - } - case other => - throw new SparkException("Unexpected element type " + first.getClass) - } + + def write(obj: Any): Unit = obj match { + case null => + dataOut.writeInt(SpecialLengths.NULL) + case arr: Array[Byte] => + dataOut.writeInt(arr.length) + dataOut.write(arr) + case str: String => + writeUTF(str, dataOut) + case stream: PortableDataStream => + write(stream.toArray()) + case (key, value) => + write(key) + write(value) + case other => + throw new SparkException("Unexpected element type " + other.getClass) } + + iter.foreach(write) } /** diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index be5ebfa921..b7cfc8bd9c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -22,6 +22,7 @@ import java.io.{File, InputStream, IOException, OutputStream} import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext +import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} private[spark] object PythonUtils { /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ @@ -39,4 +40,8 @@ private[spark] object PythonUtils { def mergePythonPaths(paths: String*): String = { paths.filter(_ != "").mkString(File.pathSeparator) } + + def generateRDDWithNull(sc: JavaSparkContext): JavaRDD[String] = { + sc.parallelize(List("a", null, "b")) + } } diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 7b866f08a0..c63d834f90 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -23,11 +23,22 @@ import org.scalatest.FunSuite class PythonRDDSuite extends FunSuite { - test("Writing large strings to the worker") { - val input: List[String] = List("a"*100000) - val buffer = new DataOutputStream(new ByteArrayOutputStream) - PythonRDD.writeIteratorToStream(input.iterator, buffer) - } + test("Writing large strings to the worker") { + val input: List[String] = List("a"*100000) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } + test("Handle nulls gracefully") { + val buffer = new DataOutputStream(new ByteArrayOutputStream) + // Should not have NPE when write an Iterator with null in it + // The correctness will be tested in Python + PythonRDD.writeIteratorToStream(Iterator("a", null), buffer) + PythonRDD.writeIteratorToStream(Iterator(null, "a"), buffer) + PythonRDD.writeIteratorToStream(Iterator("a".getBytes, null), buffer) + PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes), buffer) + PythonRDD.writeIteratorToStream(Iterator((null, null), ("a", null), (null, "b")), buffer) + PythonRDD.writeIteratorToStream( + Iterator((null, null), ("a".getBytes, null), (null, "b".getBytes)), buffer) + } } - diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py new file mode 100644 index 0000000000..ed398a82b8 --- /dev/null +++ b/examples/src/main/python/streaming/kafka_wordcount.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + Usage: network_wordcount.py + + To run this on your local machine, you need to setup Kafka and create a producer first, see + http://kafka.apache.org/documentation.html#quickstart + + and then run the example + `$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/\ + spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \ + localhost:2181 test` +""" + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.streaming.kafka import KafkaUtils + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: kafka_wordcount.py " + exit(-1) + + sc = SparkContext(appName="PythonStreamingKafkaWordCount") + ssc = StreamingContext(sc, 1) + + zkQuorum, topic = sys.argv[1:] + kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) + lines = kvs.map(lambda x: x[1]) + counts = lines.flatMap(lambda line: line.split(" ")) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a+b) + counts.pprint() + + ssc.start() + ssc.awaitTermination() diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml new file mode 100644 index 0000000000..503fc129dc --- /dev/null +++ b/external/kafka-assembly/pom.xml @@ -0,0 +1,106 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.3.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-kafka-assembly_2.10 + jar + Spark Project External Kafka Assembly + http://spark.apache.org/ + + + streaming-kafka-assembly + scala-${scala.binary.version} + spark-streaming-kafka-assembly-${project.version}.jar + ${project.build.directory}/${spark.jar.dir}/${spark.jar.basename} + + + + + org.apache.spark + spark-streaming-kafka_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + + false + ${spark.jar} + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + reference.conf + + + log4j.properties + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index d4112b03d9..08d1cc33e4 100644 --- a/pom.xml +++ b/pom.xml @@ -1629,6 +1629,7 @@ external/kafka + external/kafka-assembly diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ded4b5443a..fbc8983b95 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -44,8 +44,9 @@ object BuildCommons { sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(assembly, examples, networkYarn) = - Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _)) + val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) = + Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly") + .map(ProjectRef(buildLocation, _)) val tools = ProjectRef(buildLocation, "tools") // Root project. @@ -300,7 +301,14 @@ object Assembly { sys.props.get("hadoop.version") .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String]) }, - jarName in assembly := s"${moduleName.value}-${version.value}-hadoop${hadoopVersion.value}.jar", + jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) => + if (mName.contains("streaming-kafka-assembly")) { + // This must match the same name used in maven (see external/kafka-assembly/pom.xml) + s"${mName}-${v}.jar" + } else { + s"${mName}-${v}-hadoop${hv}.jar" + } + }, mergeStrategy in assembly := { case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index b8bda83517..0ffb41d02f 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -70,6 +70,7 @@ class SpecialLengths(object): PYTHON_EXCEPTION_THROWN = -2 TIMING_DATA = -3 END_OF_STREAM = -4 + NULL = -5 class Serializer(object): @@ -133,6 +134,8 @@ class FramedSerializer(Serializer): def _write_with_length(self, obj, stream): serialized = self.dumps(obj) + if serialized is None: + raise ValueError("serialized value should not be None") if len(serialized) > (1 << 31): raise ValueError("can not serialize object larger than 2G") write_int(len(serialized), stream) @@ -145,8 +148,10 @@ class FramedSerializer(Serializer): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError + elif length == SpecialLengths.NULL: + return None obj = stream.read(length) - if obj == "": + if len(obj) < length: raise EOFError return self.loads(obj) @@ -484,6 +489,8 @@ class UTF8Deserializer(Serializer): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError + elif length == SpecialLengths.NULL: + return None s = stream.read(length) return s.decode("utf-8") if self.use_unicode else s diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py new file mode 100644 index 0000000000..19ad71f99d --- /dev/null +++ b/python/pyspark/streaming/kafka.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from py4j.java_collections import MapConverter +from py4j.java_gateway import java_import, Py4JError + +from pyspark.storagelevel import StorageLevel +from pyspark.serializers import PairDeserializer, NoOpSerializer +from pyspark.streaming import DStream + +__all__ = ['KafkaUtils', 'utf8_decoder'] + + +def utf8_decoder(s): + """ Decode the unicode as UTF-8 """ + return s and s.decode('utf-8') + + +class KafkaUtils(object): + + @staticmethod + def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, + storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + """ + Create an input stream that pulls messages from a Kafka Broker. + + :param ssc: StreamingContext object + :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..). + :param groupId: The group id for this consumer. + :param topics: Dict of (topic_name -> numPartitions) to consume. + Each partition is consumed in its own thread. + :param kafkaParams: Additional params for Kafka + :param storageLevel: RDD storage level. + :param keyDecoder: A function used to decode key (default is utf8_decoder) + :param valueDecoder: A function used to decode value (default is utf8_decoder) + :return: A DStream object + """ + java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils") + + kafkaParams.update({ + "zookeeper.connect": zkQuorum, + "group.id": groupId, + "zookeeper.connection.timeout.ms": "10000", + }) + if not isinstance(topics, dict): + raise TypeError("topics should be dict") + jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) + jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + + def getClassByName(name): + return ssc._jvm.org.apache.spark.util.Utils.classForName(name) + + try: + array = getClassByName("[B") + decoder = getClassByName("kafka.serializer.DefaultDecoder") + jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder, + jparam, jtopics, jlevel) + except Py4JError, e: + # TODO: use --jar once it also work on driver + if not e.message or 'call a package' in e.message: + print "No kafka package, please put the assembly jar into classpath:" + print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \ + "scala-*/spark-streaming-kafka-assembly-*.jar" + raise e + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) + stream = DStream(jstream, ssc, ser) + return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index fef6c92875..c7d0622d65 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -47,9 +47,10 @@ else: from pyspark.conf import SparkConf from pyspark.context import SparkContext +from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \ - CloudPickleSerializer, CompressedSerializer + CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter from pyspark.sql import SQLContext, IntegerType, Row, ArrayType, StructType, StructField, \ UserDefinedType, DoubleType @@ -716,6 +717,13 @@ class RDDTests(ReusedPySparkTestCase): wr_s21 = rdd.sample(True, 0.4, 21).collect() self.assertNotEqual(set(wr_s11), set(wr_s21)) + def test_null_in_rdd(self): + jrdd = self.sc._jvm.PythonUtils.generateRDDWithNull(self.sc._jsc) + rdd = RDD(jrdd, self.sc, UTF8Deserializer()) + self.assertEqual([u"a", None, u"b"], rdd.collect()) + rdd = RDD(jrdd, self.sc, NoOpSerializer()) + self.assertEqual(["a", None, "b"], rdd.collect()) + def test_multiple_python_java_RDD_conversions(self): # Regression test for SPARK-5361 data = [