diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e231336823..1e39e10856 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -261,7 +261,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } else { StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed) } - self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true) + self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true) } /** @@ -291,7 +291,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } else { StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed) } - self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true) + self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala index 15691a8fc8..c8cdaa60e4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala @@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag]( thisSampler.setSeed(split.seed) thisSampler.sample(firstParent[T].iterator(split.prev, context)) } + + override protected def getOutputDeterministicLevel = { + if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) { + DeterministicLevel.INDETERMINATE + } else { + super.getOutputDeterministicLevel + } + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 53268b169d..08fc309d52 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -555,7 +555,7 @@ abstract class RDD[T: ClassTag]( val sampler = new BernoulliCellSampler[T](lb, ub) sampler.setSeed(seed + index) sampler.sample(partition) - }, preservesPartitioning = true) + }, isOrderSensitive = true, preservesPartitioning = true) } /** @@ -868,6 +868,29 @@ abstract class RDD[T: ClassTag]( preservesPartitioning) } + /** + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index + * of the original partition. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. + * + * `isOrderSensitive` indicates whether the function is order-sensitive. If it is order + * sensitive, it may return totally different result when the input order + * is changed. Mostly stateful functions are order-sensitive. + */ + private[spark] def mapPartitionsWithIndex[U: ClassTag]( + f: (Int, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean, + isOrderSensitive: Boolean): RDD[U] = withScope { + val cleanedF = sc.clean(f) + new MapPartitionsRDD( + this, + (_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), + preservesPartitioning, + isOrderSensitive = isOrderSensitive) + } + /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d6cd57fa6e..7cb7eceec6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2783,6 +2783,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi .contains("Spark cannot rollback the ShuffleMapStage 1")) } + test("SPARK-29042: Sampled RDD with unordered input should be indeterminate") { + val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false) + + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) + val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker) + + assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED) + + val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L) + assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) + } + private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = { val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2)) val shuffleId = shuffleDep.shuffleId diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0a97ab4926..8564597f4f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( (part, (e.srcId, e.dstId, e.attr)) } .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag) - iter.foreach { message => - val data = message._2 - builder.add(data._1, data._2, data._3) - } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true)).cache() + .mapPartitionsWithIndex( + { (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) => + val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag) + iter.foreach { message => + val data = message._2 + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true)).cache() GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges) }