Fixed updateStateByKey to work with primitive types.

This commit is contained in:
Tathagata Das 2013-01-14 17:18:39 -08:00
parent 131be5d62e
commit 1638fcb0dc
3 changed files with 8 additions and 8 deletions

View file

@ -377,7 +377,7 @@ extends Serializable {
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S <: AnyRef : ClassManifest](
def updateStateByKey[S: ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
@ -392,7 +392,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S <: AnyRef : ClassManifest](
def updateStateByKey[S: ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)] = {
@ -408,7 +408,7 @@ extends Serializable {
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S <: AnyRef : ClassManifest](
def updateStateByKey[S: ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = {
@ -431,7 +431,7 @@ extends Serializable {
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
def updateStateByKey[S <: AnyRef : ClassManifest](
def updateStateByKey[S: ClassManifest](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean

View file

@ -7,7 +7,7 @@ import spark.storage.StorageLevel
import spark.streaming.{Duration, Time, DStream}
private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,

View file

@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase {
)
val updateStateOperation = (s: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
}
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
s.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
}
testOperation(inputData, updateStateOperation, outputData, true)