Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.

This commit is contained in:
Tathagata Das 2013-12-30 11:13:24 -08:00
parent 6e43039614
commit f4e4066191
50 changed files with 1613 additions and 600 deletions

View file

@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.dstream.SparkFlumeEvent;
import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
@ -49,10 +50,10 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
JavaDStream<SparkFlumeEvent> flumeStream = sscWithFlume.flumeStream("localhost", port);
flumeStream.count();
@ -63,6 +64,6 @@ public class JavaFlumeEventCount {
}
}).print();
sc.start();
ssc.start();
}
}

View file

@ -29,6 +29,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka;
import scala.Tuple2;
/**
@ -63,7 +64,8 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
JavaPairDStream<String, String> messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override

View file

@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
/**
* Produces a count of events received from Flume.

View file

@ -24,6 +24,7 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.util.RawTextHelper._
import org.apache.spark.streaming.kafka._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.

View file

@ -17,11 +17,6 @@
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.MQTTReceiver
import org.apache.spark.storage.StorageLevel
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.mqtt._
/**
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes
* Space separated String Message "hello mqtt demo for spark streaming"

View file

@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
import akka.util.ByteString
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.zeromq._
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
* every one second.

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
/**
* Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
* through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions.
*/
class FlumeFunctions(ssc: StreamingContext) {
/**
* Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
}
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.streaming.dstream
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
@ -30,9 +30,10 @@ import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
private[streaming]
class FlumeInputDStream[T: ClassTag](

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
import org.apache.spark.storage.StorageLevel
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Flume input streams.
*/
class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext)
extends JavaStreamingContext(javaStreamingContext.ssc) {
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port, storageLevel)
}
}

View file

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
package object flume {
implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc)
}

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.junit.Test;
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
// tests the API, does not actually test data receiving
JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345);
JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
// To verify that JavaStreamingContextWithKafka is also StreamingContext
JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999);
}
}

View file

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

View file

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.charset.Charset
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
class FlumeStreamSuite extends TestSuiteBase {
val testPort = 9999
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
val decoder = Charset.forName("UTF-8").newDecoder()
assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
}

View file

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import java.lang.{Integer => JInt}
import java.util.{Map => JMap}
import kafka.serializer.Decoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Kafka input streams.
*/
class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
extends JavaStreamingContext(javaStreamingContext.ssc) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*
*/
def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param keyTypeClass Key type of RDD
* @param valueTypeClass value type of RDD
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration paramaters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
keyTypeClass: Class[K],
valueTypeClass: Class[V],
keyDecoderClass: Class[U],
valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
ssc.kafkaStream[K, V, U, T](
kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
}

View file

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka
import scala.reflect.ClassTag
import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
/**
* Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
* through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions.
*/
class KafkaFunctions(ssc: StreamingContext) {
/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def kafkaStream(
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
kafkaStream[String, String, StringDecoder, StringDecoder](
kafkaParams,
topics,
storageLevel)
}
/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param kafkaParams Map of kafka configuration paramaters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
}
}

View file

@ -15,11 +15,10 @@
* limitations under the License.
*/
package org.apache.spark.streaming.dstream
package org.apache.spark.streaming.kafka
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import scala.collection.Map
import scala.reflect.ClassTag
import java.util.Properties
import java.util.concurrent.Executors
@ -30,8 +29,11 @@ import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
import scala.collection.Map
import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
/**
* Input stream that pulls messages from a Kafka Broker.

View file

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
package object kafka {
implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc)
}

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
// tests the API, does not actually test data receiving
JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics);
JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect","localhost:12345");
kafkaParams.put("group.id","consumer-group");
JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream(
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
// To verify that JavaStreamingContextWithKafka is also StreamingContext
JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999);
}
}

View file

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

View file

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
class KafkaStreamSuite extends TestSuiteBase {
test("kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder](
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}
}

View file

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.mqtt
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating MQTT input streams.
*/
class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
extends JavaStreamingContext(javaStreamingContext.ssc) {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* @param brokerUrl Url of remote MQTT publisher
* @param topic topic name to subscribe to
*/
def mqttStream(
brokerUrl: String,
topic: String
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.mqttStream(brokerUrl, topic)
}
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* @param brokerUrl Url of remote MQTT publisher
* @param topic topic name to subscribe to
* @param storageLevel RDD storage level.
*/
def mqttStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.mqttStream(brokerUrl, topic, storageLevel)
}
}

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
/**
* Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
* through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
*/
class MQTTFunctions(ssc: StreamingContext) {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* @param brokerUrl Url of remote MQTT publisher
* @param topic topic name to subscribe to
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
*/
def mqttStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
}
}

View file

@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.spark.streaming.dstream
package org.apache.spark.streaming.mqtt
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.util.Properties
import java.util.concurrent.Executors
@ -34,10 +35,10 @@ import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
/**
* Input stream that subscribe messages from a Mqtt Broker.
@ -49,15 +50,14 @@ import scala.reflect.ClassTag
private[streaming]
class MQTTInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}

View file

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
package object mqtt {
implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
}

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
@Test
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc);
// tests the API, does not actually test data receiving
JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic);
JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
// To verify that JavaStreamingContextWithKafka is also StreamingContext
JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999);
}
}

View file

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

View file

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.mqtt
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
class MQTTStreamSuite extends TestSuiteBase {
test("MQTT input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val brokerUrl = "abc"
val topic = "def"
// tests the API, does not actually test data receiving
val test1 = ssc.mqttStream(brokerUrl, topic)
val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}
}

View file

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Twitter input streams.
*/
class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext)
extends JavaStreamingContext(javaStreamingContext.ssc) {
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
*/
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream(None)
}
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(filters: Array[String]): JavaDStream[Status] = {
ssc.twitterStream(None, filters)
}
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
ssc.twitterStream(None, filters, storageLevel)
}
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization
*/
def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
ssc.twitterStream(Some(twitterAuth))
}
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
ssc.twitterStream(Some(twitterAuth), filters)
}
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization object
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
}
}

View file

@ -1,27 +0,0 @@
package org.apache.spark.streaming.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
class StreamingContextWithTwitter(ssc: StreamingContext) {
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
* authorization; this uses the system properties twitter4j.oauth.consumerKey,
* .consumerSecret, .accessToken and .accessTokenSecret.
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
twitterAuth: Option[Authorization] = None,
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
/**
* Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
* through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions.
*/
class TwitterFunctions(ssc: StreamingContext) {
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
* authorization; this uses the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
}
}

View file

@ -1,7 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
package object twitter {
implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = {
new StreamingContextWithTwitter(ssc)
}
implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc)
}

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.twitter;
import java.util.Arrays;
import org.junit.Test;
import twitter4j.Status;
import twitter4j.auth.Authorization;
import twitter4j.auth.NullAuthorization;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc);
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
JavaDStream<Status> test1 = sscWithTwitter.twitterStream();
JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters);
JavaDStream<Status> test3 =
sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth);
JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters);
JavaDStream<Status> test6 =
sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
// To verify that JavaStreamingContextWithKafka is also StreamingContext
JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999);
}
}

View file

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
class TwitterStreamSuite extends TestSuiteBase {
test("kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val filters = Seq("filter1", "filter2")
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
val test1 = ssc.twitterStream(None)
val test2 = ssc.twitterStream(None, filters)
val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val test4 = ssc.twitterStream(Some(authorization))
val test5 = ssc.twitterStream(Some(authorization), filters)
val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
}
}

View file

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import akka.actor.SupervisorStrategy
import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating ZeroMQ input streams.
*/
class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
extends JavaStreamingContext(javaStreamingContext.ssc) {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
def zeroMQStream[T](
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level.
*/
def zeroMQStream[T](
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
*/
def zeroMQStream[T](
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
}

View file

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.receivers._
/**
* Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
* through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
*/
class ZeroMQFunctions(ssc: StreamingContext) {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* (which might be deserializer of bytes) to translate from sequence
* of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassTag](
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.streaming.receivers
package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
@ -24,6 +24,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
import org.apache.spark.streaming.receivers._
/**
* A receiver to subscribe to ZeroMQ stream.

View file

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
package object zeromq {
implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
}

View file

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.zeromq;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@Override
public Iterable<String> call(byte[][] bytes) throws Exception {
return null;
}
};
JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects);
JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
// To verify that JavaStreamingContextWithKafka is also StreamingContext
JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
}
}

View file

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

View file

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.zeromq
import akka.actor.SupervisorStrategy
import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
class ZeroMQStreamSuite extends TestSuiteBase {
test("zeromq input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val publishUrl = "abc"
val subscribe = new Subscribe(null.asInstanceOf[ByteString])
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
val test2 = ssc.zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving
}
}

View file

@ -49,9 +49,6 @@ object SparkBuild extends Build {
lazy val repl = Project("repl", file("repl"), settings = replSettings)
.dependsOn(core, bagel, mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, bagel, streaming, externalTwitter)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
@ -60,8 +57,6 @@ object SparkBuild extends Build {
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming)
lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
.dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
@ -94,10 +89,31 @@ object SparkBuild extends Build {
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj)
lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@ -167,7 +183,7 @@ object SparkBuild extends Build {
</issueManagement>
),
/*
/*
publishTo <<= version { (v: String) =>
val nexus = "https://oss.sonatype.org/"
if (v.trim.endsWith("SNAPSHOT"))
@ -176,8 +192,7 @@ object SparkBuild extends Build {
Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2")
},
*/
*/
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
@ -264,7 +279,6 @@ object SparkBuild extends Build {
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
)
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(
@ -302,21 +316,10 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
resolvers ++= Seq(
"Eclipse Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/",
"Apache repo" at "https://repository.apache.org/content/repositories/releases"
),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
excludeAll(excludeNetty),
"org.eclipse.paho" % "mqtt-client" % "0.4.0",
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
// "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
"commons-io" % "commons-io" % "2.4"
)
)
@ -331,8 +334,8 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
)
@ -358,9 +361,45 @@ object SparkBuild extends Build {
)
def twitterSettings() = streamingSettings ++ Seq(
name := "spark-twitter",
name := "spark-streaming-twitter",
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
)
)
def kafkaSettings() = streamingSettings ++ Seq(
name := "spark-streaming-kafka",
libraryDependencies ++= Seq(
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
excludeAll(excludeNetty)
)
)
def flumeSettings() = streamingSettings ++ Seq(
name := "spark-streaming-flume",
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy)
)
)
def zeromqSettings() = streamingSettings ++ Seq(
name := "spark-streaming-zeromq",
libraryDependencies ++= Seq(
"org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
)
)
def mqttSettings() = streamingSettings ++ Seq(
name := "spark-streaming-mqtt",
resolvers ++= Seq(
"Apache repo" at "https://repository.apache.org/content/repositories/releases"
),
libraryDependencies ++= Seq(
"org.eclipse.paho" % "mqtt-client" % "0.4.0"
)
)
}

View file

@ -17,21 +17,6 @@
package org.apache.spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import org.apache.spark.streaming.dstream._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.receivers.ActorReceiver
import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
import org.apache.spark.streaming.receivers.ZeroMQReceiver
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.receivers.ActorReceiver
import scala.collection.mutable.Queue
import scala.collection.Map
import scala.reflect.ClassTag
@ -40,15 +25,22 @@ import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
//import twitter4j.Status
//import twitter4j.auth.Authorization
import org.apache.spark.streaming.scheduler._
import akka.util.ByteString
import akka.actor.Props
import akka.actor.SupervisorStrategy
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@ -223,74 +215,6 @@ class StreamingContext private (
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* (which might be deserializer of bytes) to translate from sequence
* of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassTag](
publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def kafkaStream(
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
kafkaParams,
topics,
storageLevel)
}
/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param kafkaParams Map of kafka configuration paramaters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
def kafkaStream[
K: ClassTag,
V: ClassTag,
U <: kafka.serializer.Decoder[_]: Manifest,
T <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
@ -329,22 +253,6 @@ class StreamingContext private (
inputStream
}
/**
* Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
registerInputStream(inputStream)
inputStream
}
/**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
@ -467,21 +375,6 @@ class StreamingContext private (
inputStream
}
/**
* Create an input stream that receives messages pushed by a mqtt publisher.
* @param brokerUrl Url of remote mqtt publisher
* @param topic topic name to subscribe to
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def mqttStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
registerInputStream(inputStream)
inputStream
}
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/

View file

@ -17,28 +17,21 @@
package org.apache.spark.streaming.api.java
import java.lang.{Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap, List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.io.InputStream
import java.util.{Map => JMap, List => JList}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
//import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import akka.util.ByteString
//import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
/**
@ -133,81 +126,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/** The underlying SparkContext */
val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
: JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
StorageLevel.MEMORY_ONLY_SER_2)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*
*/
def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param keyTypeClass Key type of RDD
* @param valueTypeClass value type of RDD
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration paramaters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
keyTypeClass: Class[K],
valueTypeClass: Class[V],
keyDecoderClass: Class[U],
valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaPairDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
ssc.kafkaStream[K, V, U, T](
kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
@ -318,98 +236,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.fileStream[K, V, F](directory)
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port, storageLevel)
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
/*
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization object
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
}
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* .consumerSecret, .accessToken and .accessTokenSecret to be set.
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
ssc.twitterStream(None, filters, storageLevel)
}
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
ssc.twitterStream(Some(twitterAuth), filters)
}
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* .consumerSecret, .accessToken and .accessTokenSecret to be set.
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(
filters: Array[String]
): JavaDStream[Status] = {
ssc.twitterStream(None, filters)
}
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization
*/
def twitterStream(
twitterAuth: Authorization
): JavaDStream[Status] = {
ssc.twitterStream(Some(twitterAuth))
}
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* .consumerSecret, .accessToken and .accessTokenSecret to be set.
*/
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream()
}
*/
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
@ -472,70 +298,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.actorStream[T](props, name)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] Iterator[T],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
*/
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
/**
* Registers an output stream that will be computed every interval
*/

View file

@ -33,6 +33,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val ssc = jobScheduler.ssc
val clockClass = System.getProperty(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
logInfo("Using clock class = " + clockClass)
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateJobs(new Time(longTime)))

View file

@ -160,7 +160,10 @@ class NetworkInputTracker(
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// Distribute the receivers and start them
ssc.sparkContext.runJob(tempRDD, startReceiver)

View file

@ -17,23 +17,17 @@
package org.apache.spark.streaming;
import scala.Tuple2;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
import java.util.*;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import twitter4j.Status;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.SparkFlumeEvent;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.JavaCheckpointTestUtils;
import java.io.*;
import java.util.*;
import akka.actor.Props;
import akka.zeromq.Subscribe;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite implements Serializable {
private transient JavaStreamingContext ssc;
@Before
public void setUp() {
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@ -1596,26 +1562,6 @@ public class JavaAPISuite implements Serializable {
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect","localhost:12345");
kafkaParams.put("group.id","consumer-group");
JavaPairDStream<String, String> test3 = ssc.kafkaStream(
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics,
StorageLevel.MEMORY_AND_DISK());
}
@Test
public void testSocketTextStream() {
JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable {
public void testRawSocketStream() {
JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
/*
@Test
public void testFileStream() {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
}
@Test
@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable {
return null;
}
});
}
} */
}

View file

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
public abstract class LocalJavaStreamingContext {
protected transient JavaStreamingContext ssc;
@Before
public void setUp() {
System.clearProperty("spark.driver.port");
System.clearProperty("spark.hostPort");
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
System.clearProperty("spark.hostPort");
}
}

View file

@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
import org.apache.spark.streaming.dstream.{NetworkReceiver}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.{specific, NettyTransceiver}
import org.apache.avro.ipc.specific.SpecificRequestor
import java.nio.ByteBuffer
import collection.JavaConversions._
import java.nio.charset.Charset
import com.google.common.io.Files
import java.util.concurrent.atomic.AtomicInteger
@ -99,55 +92,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
val decoder = Charset.forName("UTF-8").newDecoder()
assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
@ -249,21 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
test("kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val topics = Map("my-topic" -> 1)
val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
// Test specifying decoder
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
val test3 = ssc.kafkaStream[
String,
String,
kafka.serializer.StringDecoder,
kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
}
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10

View file

@ -137,11 +137,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
if (useManualClock) {
System.setProperty(
"spark.streaming.clock",
"org.apache.spark.streaming.util.ManualClock"
)
logInfo("Using manual clock")
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
} else {
logInfo("Using real clock")
System.clearProperty("spark.streaming.clock")
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
@ -273,7 +272,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
Thread.sleep(100)
Thread.sleep(10)
}
val timeTaken = System.currentTimeMillis() - startTime