[SPARK-31518][CORE] Expose filterByRange in JavaPairRDD

### What changes were proposed in this pull request?

This exposes the `filterByRange` method from `OrderedRDDFunctions` in the Java API (as a method of JavaPairRDD).

This is the only method of `OrderedRDDFunctions` which is not exposed in the Java API so far.

### Why are the changes needed?

This improves the consistency between the Scala and Java APIs. Calling the Scala method manually from a Java context is cumbersome as it requires passing many ClassTags.

### Does this PR introduce any user-facing change?

Yes, a new method in the Java API.

### How was this patch tested?

With unit tests. The implementation of the Scala method is already tested independently and it was not touched in this PR.

Suggesting srowen as a reviewer.

Closes #28293 from wetneb/SPARK-31518.

Authored-by: Antonin Delpeuch <antonin@delpeuch.eu>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Antonin Delpeuch 2020-04-22 20:04:17 -07:00 committed by Dongjoon Hyun
parent da3c6c4e35
commit 497024956a
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
2 changed files with 57 additions and 0 deletions

View file

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._ import org.apache.spark.Partitioner._
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction, import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction,
@ -937,6 +938,34 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
} }
/**
* Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
* If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
* performed efficiently by only scanning the partitions that might containt matching elements.
* Otherwise, a standard `filter` is applied to all partitions.
*
* @since 3.1.0
*/
@Since("3.1.0")
def filterByRange(lower: K, upper: K): JavaPairRDD[K, V] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
filterByRange(comp, lower, upper)
}
/**
* Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
* If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
* performed efficiently by only scanning the partitions that might containt matching elements.
* Otherwise, a standard `filter` is applied to all partitions.
*
* @since 3.1.0
*/
@Since("3.1.0")
def filterByRange(comp: Comparator[K], lower: K, upper: K): JavaPairRDD[K, V] = {
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).filterByRange(lower, upper))
}
/** /**
* Return an RDD with the keys of each tuple. * Return an RDD with the keys of each tuple.
*/ */

View file

@ -251,6 +251,34 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8))); Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
} }
@Test
public void filterByRange() {
List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
pairs.add(new Tuple2<>(0, 5));
pairs.add(new Tuple2<>(1, 8));
pairs.add(new Tuple2<>(2, 6));
pairs.add(new Tuple2<>(3, 8));
pairs.add(new Tuple2<>(4, 8));
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs).sortByKey();
// Default comparator
JavaPairRDD<Integer, Integer> filteredRDD = rdd.filterByRange(3, 11);
List<Tuple2<Integer, Integer>> filteredPairs = filteredRDD.collect();
assertEquals(filteredPairs.size(), 2);
assertEquals(filteredPairs.get(0), new Tuple2<>(3, 8));
assertEquals(filteredPairs.get(1), new Tuple2<>(4, 8));
// Custom comparator
filteredRDD = rdd.filterByRange(Collections.reverseOrder(), 3, -2);
filteredPairs = filteredRDD.collect();
assertEquals(filteredPairs.size(), 4);
assertEquals(filteredPairs.get(0), new Tuple2<>(0, 5));
assertEquals(filteredPairs.get(1), new Tuple2<>(1, 8));
assertEquals(filteredPairs.get(2), new Tuple2<>(2, 6));
assertEquals(filteredPairs.get(3), new Tuple2<>(3, 8));
}
@Test @Test
public void emptyRDD() { public void emptyRDD() {
JavaRDD<String> rdd = sc.emptyRDD(); JavaRDD<String> rdd = sc.emptyRDD();