[SPARK-29042][CORE] Sampling-based RDD with unordered input should be INDETERMINATE

### What changes were proposed in this pull request?

We already have found and fixed the correctness issue before when RDD output is INDETERMINATE. One missing part is sampling-based RDD. This kind of RDDs is order sensitive to its input. A sampling-based RDD with unordered input, should be INDETERMINATE.

### Why are the changes needed?

A sampling-based RDD with unordered input is just like MapPartitionsRDD with isOrderSensitive parameter as true. The RDD output can be different after a rerun.

It is a problem in ML applications.

In ML, sample is used to prepare training data. ML algorithm fits the model based on the sampled data. If rerun tasks of sample produce different output during model fitting, ML results will be unreliable and also buggy.

Each sample is random output, but once you sampled, the output should be determinate.

### Does this PR introduce any user-facing change?

Previously, a sampling-based RDD can possibly come with different output after a rerun.
After this patch, sampling-based RDD is INDETERMINATE. For an INDETERMINATE map stage, currently Spark scheduler will re-try all the tasks of the failed stage.

### How was this patch tested?

Added test.

Closes #25751 from viirya/sample-order-sensitive.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
This commit is contained in:
Liang-Chi Hsieh 2019-09-13 14:07:00 -07:00
parent e63098b287
commit c610de6952
5 changed files with 56 additions and 12 deletions

View file

@ -261,7 +261,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}
/**
@ -291,7 +291,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}
/**

View file

@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
thisSampler.setSeed(split.seed)
thisSampler.sample(firstParent[T].iterator(split.prev, context))
}
override protected def getOutputDeterministicLevel = {
if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
DeterministicLevel.INDETERMINATE
} else {
super.getOutputDeterministicLevel
}
}
}

View file

@ -555,7 +555,7 @@ abstract class RDD[T: ClassTag](
val sampler = new BernoulliCellSampler[T](lb, ub)
sampler.setSeed(seed + index)
sampler.sample(partition)
}, preservesPartitioning = true)
}, isOrderSensitive = true, preservesPartitioning = true)
}
/**
@ -868,6 +868,29 @@ abstract class RDD[T: ClassTag](
preservesPartitioning)
}
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*
* `isOrderSensitive` indicates whether the function is order-sensitive. If it is order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
*/
private[spark] def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean,
isOrderSensitive: Boolean): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}
/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of

View file

@ -2783,6 +2783,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
.contains("Spark cannot rollback the ShuffleMapStage 1"))
}
test("SPARK-29042: Sampled RDD with unordered input should be indeterminate") {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false)
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED)
val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L)
assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE)
}
private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId

View file

@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}