Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040)
This fixes an issue where collectAsMap() could fail when called on a JavaPairRDD that was derived by transforming a non-JavaPairRDD. The root problem was that we were creating the JavaPairRDD's ClassTag by casting a ClassTag[AnyRef] to a ClassTag[Tuple2[K2, V2]]. To fix this, I cast a ClassTag[Tuple2[_, _]] instead, since this actually produces a ClassTag of the appropriate type because ClassTags don't capture type parameters: scala> implicitly[ClassTag[Tuple2[_, _]]] == implicitly[ClassTag[Tuple2[Int, Int]]] res8: Boolean = true scala> implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[Int, Int]]] == implicitly[ClassTag[Tuple2[Int, Int]]] res9: Boolean = false
This commit is contained in:
parent
531d9d7576
commit
740e865f40
|
@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
* Return a new RDD by applying a function to all elements of this RDD.
|
||||
*/
|
||||
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
|
||||
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
|
||||
import scala.collection.JavaConverters._
|
||||
def fn = (x: T) => f.apply(x).asScala
|
||||
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
|
|
|
@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable {
|
|||
return 1.0 * x;
|
||||
}
|
||||
}).cache();
|
||||
doubles.collect();
|
||||
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
|
||||
@Override
|
||||
public Tuple2<Integer, Integer> call(Integer x) {
|
||||
return new Tuple2<Integer, Integer>(x, x);
|
||||
}
|
||||
}).cache();
|
||||
pairs.collect();
|
||||
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
|
||||
@Override
|
||||
public String call(Integer x) {
|
||||
return x.toString();
|
||||
}
|
||||
}).cache();
|
||||
strings.collect();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void collectAsMapWithIntArrayValues() {
|
||||
// Regression test for SPARK-1040
|
||||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
|
||||
JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
|
||||
@Override
|
||||
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
|
||||
return new Tuple2<Integer, int[]>(x, new int[] { x });
|
||||
}
|
||||
});
|
||||
pairRDD.collect(); // Works fine
|
||||
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
|
|||
|
||||
/** Return a new DStream by applying a function to all elements of this DStream. */
|
||||
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
|
||||
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
|
@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
|
|||
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
|
||||
import scala.collection.JavaConverters._
|
||||
def fn = (x: T) => f.apply(x).asScala
|
||||
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
|
||||
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
|
|
|
@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
|
|||
}
|
||||
|
||||
override val classTag: ClassTag[(K, V)] =
|
||||
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
|
||||
implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
|
||||
}
|
||||
|
||||
object JavaPairDStream {
|
||||
|
|
Loading…
Reference in a new issue