[SPARK-4608][Streaming] Reorganize StreamingContext implicit to improve API convenience
There is only one implicit function `toPairDStreamFunctions` in `StreamingContext`. This PR did similar reorganization like [SPARK-4397](https://issues.apache.org/jira/browse/SPARK-4397). Compiled the following codes with Spark Streaming 1.1.0 and ran it with this PR. Everything is fine. ```Scala import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object StreamingApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount") val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.textFileStream("/some/path") val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } ``` Author: zsxwing <zsxwing@gmail.com> Closes #3464 from zsxwing/SPARK-4608 and squashes the following commits: aa6d44a [zsxwing] Fix a copy-paste error f74c190 [zsxwing] Merge branch 'master' into SPARK-4608 e6f9cc9 [zsxwing] Update the docs 27833bb [zsxwing] Remove `import StreamingContext._` c15162c [zsxwing] Reorganize StreamingContext implicit to improve API convenience
This commit is contained in:
parent
f205fe477c
commit
f9ed2b6641
|
@ -75,7 +75,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
|
|||
{% highlight scala %}
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
|
||||
|
||||
// Create a local StreamingContext with two working thread and batch interval of 1 second.
|
||||
// The master requires 2 cores to prevent from a starvation scenario.
|
||||
|
@ -107,7 +107,7 @@ each line will be split into multiple words and the stream of words is represent
|
|||
`words` DStream. Next, we want to count these words.
|
||||
|
||||
{% highlight scala %}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
|
||||
// Count each word in each batch
|
||||
val pairs = words.map(word => (word, 1))
|
||||
val wordCounts = pairs.reduceByKey(_ + _)
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.util.Properties
|
|||
import kafka.producer._
|
||||
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.kafka._
|
||||
import org.apache.spark.SparkConf
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.net.Socket
|
|||
import org.apache.spark.{SparkConf, Logging}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.receiver.Receiver
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
|
||||
/**
|
||||
* Counts words in new text files created in the given directory
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
|
|||
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.mqtt._
|
||||
import org.apache.spark.SparkConf
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,7 +22,6 @@ import scala.collection.mutable.SynchronizedQueue
|
|||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
|
||||
object QueueStream {
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.io.Files
|
|||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.util.IntParam
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming
|
|||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.HashPartitioner
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
|
||||
/**
|
||||
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.spark.SparkConf
|
|||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.twitter._
|
||||
|
||||
// scalastyle:off
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.examples.streaming
|
||||
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import StreamingContext._
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.streaming.twitter._
|
||||
import org.apache.spark.SparkConf
|
||||
|
|
|
@ -24,7 +24,6 @@ import akka.zeromq.Subscribe
|
|||
import akka.util.ByteString
|
||||
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.zeromq._
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming.clickstream
|
|||
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.examples.streaming.StreamingExamples
|
||||
// scalastyle:off
|
||||
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.spark.SparkContext._
|
|||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.DStream
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.random.XORShiftRandom
|
||||
|
|
|
@ -22,7 +22,6 @@ import scala.reflect.ClassTag
|
|||
import org.apache.spark.Logging
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.mllib.linalg.Vector
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.DStream
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
|
||||
import scala.collection.Map
|
||||
import scala.collection.mutable.Queue
|
||||
import scala.language.implicitConversions
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import akka.actor.{Props, SupervisorStrategy}
|
||||
|
@ -523,9 +522,11 @@ object StreamingContext extends Logging {
|
|||
|
||||
private[streaming] val DEFAULT_CLEANER_TTL = 3600
|
||||
|
||||
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
|
||||
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
|
||||
"kept here only for backward compatibility.", "1.3.0")
|
||||
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
|
||||
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
|
||||
new PairDStreamFunctions[K, V](stream)
|
||||
DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
|
|||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.DStream
|
||||
|
||||
/**
|
||||
|
@ -815,6 +814,6 @@ object JavaPairDStream {
|
|||
|
||||
def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
|
||||
: JavaPairDStream[K, JLong] = {
|
||||
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
|
||||
DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.spark.streaming.dstream
|
|||
|
||||
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
|
||||
|
||||
import scala.deprecated
|
||||
import scala.collection.mutable.HashMap
|
||||
import scala.language.implicitConversions
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.matching.Regex
|
||||
|
||||
|
@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
|
|||
import org.apache.spark.rdd.{BlockRDD, RDD}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.StreamingContext.rddToFileName
|
||||
import org.apache.spark.streaming.scheduler.Job
|
||||
import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
|
||||
|
||||
|
@ -48,8 +48,7 @@ import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
|
|||
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
|
||||
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
|
||||
* `join`. These operations are automatically available on any DStream of pairs
|
||||
* (e.g., DStream[(Int, Int)] through implicit conversions when
|
||||
* `org.apache.spark.streaming.StreamingContext._` is imported.
|
||||
* (e.g., DStream[(Int, Int)] through implicit conversions.
|
||||
*
|
||||
* DStreams internally is characterized by a few basic properties:
|
||||
* - A list of other DStreams that the DStream depends on
|
||||
|
@ -802,10 +801,21 @@ abstract class DStream[T: ClassTag] (
|
|||
}
|
||||
}
|
||||
|
||||
private[streaming] object DStream {
|
||||
object DStream {
|
||||
|
||||
// `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
|
||||
// `import StreamingContext._` to enable it. Now we move it here to make the compiler find
|
||||
// it automatically. However, we still keep the old function in StreamingContext for backward
|
||||
// compatibility and forward to the following function directly.
|
||||
|
||||
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
|
||||
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
|
||||
PairDStreamFunctions[K, V] = {
|
||||
new PairDStreamFunctions[K, V](stream)
|
||||
}
|
||||
|
||||
/** Get the creation site of a DStream from the stack trace of when the DStream is created. */
|
||||
def getCreationSite(): CallSite = {
|
||||
private[streaming] def getCreationSite(): CallSite = {
|
||||
val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
|
||||
val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
|
||||
val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
|
||||
|
|
|
@ -27,12 +27,10 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
|
|||
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.streaming.{Duration, Time}
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.StreamingContext.rddToFileName
|
||||
|
||||
/**
|
||||
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
|
||||
* Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
|
||||
* these functions.
|
||||
*/
|
||||
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
|
||||
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.spark.streaming.dstream
|
||||
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
|
||||
import org.apache.spark.Partitioner
|
||||
|
|
|
@ -26,7 +26,7 @@ package org.apache.spark
|
|||
* available only on DStreams
|
||||
* of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically
|
||||
* available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit
|
||||
* conversions when you `import org.apache.spark.streaming.StreamingContext._`.
|
||||
* conversions.
|
||||
*
|
||||
* For the Java API of Spark Streaming, take a look at the
|
||||
* [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkException}
|
|||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.rdd.{BlockRDD, RDD}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
|
||||
import org.apache.spark.HashPartitioner
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.io.{IntWritable, Text}
|
|||
import org.apache.hadoop.mapred.TextOutputFormat
|
||||
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
|
||||
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
|
||||
import org.apache.spark.streaming.util.ManualClock
|
||||
import org.apache.spark.util.Utils
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.spark.streaming
|
|||
import org.apache.spark.Logging
|
||||
import org.apache.spark.streaming.dstream.DStream
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
|
||||
import scala.util.Random
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.spark.streaming
|
||||
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.dstream.DStream
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.streamingtest
|
||||
|
||||
/**
|
||||
* A test suite to make sure all `implicit` functions work correctly.
|
||||
*
|
||||
* As `implicit` is a compiler feature, we don't need to run this class.
|
||||
* What we need to do is making the compiler happy.
|
||||
*/
|
||||
class ImplicitSuite {
|
||||
|
||||
// We only want to test if `implict` works well with the compiler, so we don't need a real DStream.
|
||||
def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null
|
||||
|
||||
def testToPairDStreamFunctions(): Unit = {
|
||||
val dstream: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = mockDStream
|
||||
dstream.groupByKey()
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue