Merge pull request #530 from aarondav/cleanup. Closes #530.

Remove explicit conversion to PairRDDFunctions in cogroup()

As SparkContext._ is already imported, using the implicit conversion appears to make the code much cleaner. Perhaps there was some sinister reason for doing the conversion explicitly, however.

Author: Aaron Davidson <aaron@databricks.com>

== Merge branch commits ==

commit aa4a63f1bfd5b5178fe67364dd7ce4d84c357996
Author: Aaron Davidson <aaron@databricks.com>
Date:   Sun Feb 2 23:48:04 2014 -0800

    Remove explicit conversion to PairRDDFunctions in cogroup()

    As SparkContext._ is already imported, using the implicit conversion
    appears to make the code much cleaner. Perhaps there was some sinister
    reason for doing the converion explicitly, however.
This commit is contained in:
Aaron Davidson 2014-02-03 11:25:39 -08:00 committed by Reynold Xin
parent 0386f42e38
commit 1625d8c446

View file

@ -458,8 +458,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.") throw new SparkException("Default partitioner cannot partition array keys.")
} }
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag) cg.mapValues { case Seq(vs, ws) =>
prfs.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
} }
} }
@ -474,8 +473,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.") throw new SparkException("Default partitioner cannot partition array keys.")
} }
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag) cg.mapValues { case Seq(vs, w1s, w2s) =>
prfs.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
} }
} }
@ -749,7 +747,3 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass
} }
private[spark] object ClassTags {
val seqSeqClassTag = classTag[Seq[Seq[_]]]
}