[SPARK-19287][CORE][STREAMING] JavaPairRDD flatMapValues requires function returning Iterable, not Iterator

## What changes were proposed in this pull request?

Fix old oversight in API: Java `flatMapValues` needs a `FlatMapFunction`

## How was this patch tested?

Existing tests.

Closes #22690 from srowen/SPARK-19287.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2018-10-12 18:10:59 -05:00
parent e965fb55ac
commit 1ddfab8c4f
5 changed files with 14 additions and 11 deletions

View file

@ -19,7 +19,7 @@ package org.apache.spark.api.java
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
import java.util.{Comparator, List => JList}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@ -34,7 +34,8 @@ import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction,
Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
@ -674,8 +675,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
def flatMapValues[U](f: FlatMapFunction[V, U]): JavaPairRDD[K, U] = {
def fn: (V) => Iterator[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}

View file

@ -36,7 +36,9 @@ object MimaExcludes {
// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.java.JavaPairRDD.flatMapValues"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues")
)
// Exclude rules for 2.4.x

View file

@ -34,7 +34,8 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, Optional}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction,
Function2 => JFunction2}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
@ -562,9 +563,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Return a new DStream by applying a flatmap function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn: (V) => Iterable[U] = (x: V) => f.apply(x).asScala
def flatMapValues[U](f: FlatMapFunction[V, U]): JavaPairDStream[K, U] = {
def fn: (V) => Iterator[U] = (x: V) => f.call(x).asScala
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.flatMapValues(fn)

View file

@ -841,7 +841,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, String> flatMapped =
pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2").iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);

View file

@ -1355,7 +1355,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<String> out = new ArrayList<>();
out.add(in + "1");
out.add(in + "2");
return out;
return out.iterator();
});
JavaTestUtils.attachTestOutputStream(flatMapped);