[SPARK-3580][CORE] Add Consistent Method To Get Number of RDD Partitions Across Different Languages
I have tried to address all the comments in pull request https://github.com/apache/spark/pull/2447. Note that the second commit (using the new method in all internal code of all components) is quite intrusive and could be omitted. Author: Jeroen Schot <jeroen.schot@surfsara.nl> Closes #9767 from schot/master.
This commit is contained in:
parent
4375eb3f48
commit
128c29035b
|
@ -28,6 +28,7 @@ import com.google.common.base.Optional
|
|||
import org.apache.hadoop.io.compress.CompressionCodec
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.annotation.Since
|
||||
import org.apache.spark.api.java.JavaPairRDD._
|
||||
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
|
||||
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
|
||||
|
@ -62,6 +63,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
/** Set of partitions in this RDD. */
|
||||
def partitions: JList[Partition] = rdd.partitions.toSeq.asJava
|
||||
|
||||
/** Return the number of partitions in this RDD. */
|
||||
@Since("1.6.0")
|
||||
def getNumPartitions: Int = rdd.getNumPartitions
|
||||
|
||||
/** The partitioner of this RDD. */
|
||||
def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner)
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.Partitioner._
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.annotation.{Since, DeveloperApi}
|
||||
import org.apache.spark.api.java.JavaRDD
|
||||
import org.apache.spark.partial.BoundedDouble
|
||||
import org.apache.spark.partial.CountEvaluator
|
||||
|
@ -242,6 +242,12 @@ abstract class RDD[T: ClassTag](
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of partitions of this RDD.
|
||||
*/
|
||||
@Since("1.6.0")
|
||||
final def getNumPartitions: Int = partitions.length
|
||||
|
||||
/**
|
||||
* Get the preferred locations of a partition, taking into account whether the
|
||||
* RDD is checkpointed.
|
||||
|
|
|
@ -973,6 +973,19 @@ public class JavaAPISuite implements Serializable {
|
|||
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getNumPartitions(){
|
||||
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
|
||||
JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
|
||||
JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
|
||||
new Tuple2<>("a", 1),
|
||||
new Tuple2<>("aa", 2),
|
||||
new Tuple2<>("aaa", 3)
|
||||
), 2);
|
||||
Assert.assertEquals(3, rdd1.getNumPartitions());
|
||||
Assert.assertEquals(2, rdd2.getNumPartitions());
|
||||
Assert.assertEquals(2, rdd3.getNumPartitions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void repartition() {
|
||||
|
|
|
@ -34,6 +34,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
|
|||
|
||||
test("basic operations") {
|
||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||
assert(nums.getNumPartitions === 2)
|
||||
assert(nums.collect().toList === List(1, 2, 3, 4))
|
||||
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
|
||||
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
|
||||
|
|
|
@ -155,6 +155,10 @@ object MimaExcludes {
|
|||
"org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor"),
|
||||
ProblemFilters.exclude[MissingClassProblem](
|
||||
"org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor$")
|
||||
) ++ Seq(
|
||||
// SPARK-3580 Add getNumPartitions method to JavaRDD
|
||||
ProblemFilters.exclude[MissingMethodProblem](
|
||||
"org.apache.spark.api.java.JavaRDDLike.getNumPartitions")
|
||||
)
|
||||
case v if v.startsWith("1.5") =>
|
||||
Seq(
|
||||
|
|
Loading…
Reference in a new issue