[Spark] RDD take() method: overestimate too much

In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%."

`(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned`
Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`)

This could be a performance problem. (unless this is the intended behavior)

Author: yingjieMiao <yingjie@42go.com>

Closes #2648 from yingjieMiao/rdd_take and squashes the following commits:

d758218 [yingjieMiao] scala style fix
a8e74bb [yingjieMiao] python style fix
4b6e777 [yingjieMiao] infix operator style fix
4391d3b [yingjieMiao] typo fix.
692f4e6 [yingjieMiao] cap numPartsToTry
c4483dc [yingjieMiao] style fix
1d2c410 [yingjieMiao] also change in rdd.py and AsyncRDD
d31ff7e [yingjieMiao] handle the edge case after 1 iteration
a2aa36b [yingjieMiao] RDD take method: overestimate too much
This commit is contained in:
yingjieMiao 2014-10-13 13:11:55 -07:00 committed by Reynold Xin
parent 39ccabacf1
commit 49bbdcb660
3 changed files with 16 additions and 9 deletions

View file

@ -78,16 +78,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
// by 50%. We also cap the estimation in the end.
if (results.size == 0) {
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / results.size).toInt
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
val left = num - results.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

View file

@ -1081,13 +1081,15 @@ abstract class RDD[T: ClassTag](
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
// We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

View file

@ -1070,10 +1070,13 @@ class RDD(object):
# If we didn't find any rows after the previous iteration,
# quadruple and retry. Otherwise, interpolate the number of
# partitions we need to try, but overestimate it by 50%.
# We also cap the estimation in the end.
if len(items) == 0:
numPartsToTry = partsScanned * 4
else:
numPartsToTry = int(1.5 * num * partsScanned / len(items))
# the first paramter of max is >=1 whenever partsScanned >= 2
numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned
numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4)
left = num - len(items)