[SPARK-29336][SQL] Fix the implementation of QuantileSummaries.merge (guarantee that the relativeError will be respected)
### What changes were proposed in this pull request? Reimplement `org.apache.spark.sql.catalyst.util.QuantileSummaries#merge` and add a test-case showing the previous bug. ### Why are the changes needed? The original Greenwald-Khanna paper, from which the algorithm behind `approxQuantile` was taken, does not cover how to merge the result of multiple parallel QuantileSummaries. The current implementation violates some invariants and therefore the effective error can be larger than the specified. ### Does this PR introduce any user-facing change? Yes, for same cases, the results from `approxQuantile` (`percentile_approx` in SQL) will now be within the expected error margin. For example: ```scala var values = (1 to 100).toArray val all_quantiles = values.indices.map(i => (i+1).toDouble / values.length).toArray for (n <- 0 until 5) { var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5) val all_answers = df.stat.approxQuantile("value", all_quantiles, 0.1) val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) => Math.abs(expected - answer) }).toArray val max_error = error.max print(max_error + "\n") } ``` In the current build it returns: ``` 16 12 10 11 17 ``` I couldn't run the code with this patch applied to double check the implementation. Can someone please confirm it now outputs at most `10`, please? ### How was this patch tested? A new unit test was added to uncover the previous bug. Closes #26029 from sitegui/SPARK-29336. Authored-by: Guilherme <sitegui@sitegui.com.br> Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
parent
4e6d31f570
commit
de360e96d7
|
@ -159,14 +159,72 @@ class QuantileSummaries(
|
|||
other.shallowCopy
|
||||
} else {
|
||||
// Merge the two buffers.
|
||||
// The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
|
||||
// statistics during the merging: the invariants are still respected after the merge.
|
||||
// TODO: could replace full sort by ordered merge, the two lists are known to be sorted
|
||||
// already.
|
||||
val res = (sampled ++ other.sampled).sortBy(_.value)
|
||||
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
|
||||
new QuantileSummaries(
|
||||
other.compressThreshold, other.relativeError, comp, other.count + count, true)
|
||||
// The GK algorithm is a bit unclear about it, but we need to adjust the statistics during the
|
||||
// merging. The main idea is that samples that come from one side will suffer from the lack of
|
||||
// precision of the other.
|
||||
// As a concrete example, take two QuantileSummaries whose samples (value, g, delta) are:
|
||||
// `a = [(0, 1, 0), (20, 99, 0)]` and `b = [(10, 1, 0), (30, 49, 0)]`
|
||||
// This means `a` has 100 values, whose minimum is 0 and maximum is 20,
|
||||
// while `b` has 50 values, between 10 and 30.
|
||||
// The resulting samples of the merge will be:
|
||||
// a+b = [(0, 1, 0), (10, 1, ??), (20, 99, ??), (30, 49, 0)]
|
||||
// The values of `g` do not change, as they represent the minimum number of values between two
|
||||
// consecutive samples. The values of `delta` should be adjusted, however.
|
||||
// Take the case of the sample `10` from `b`. In the original stream, it could have appeared
|
||||
// right after `0` (as expressed by `g=1`) or right before `20`, so `delta=99+0-1=98`.
|
||||
// In the GK algorithm's style of working in terms of maximum bounds, one can observe that the
|
||||
// maximum additional uncertainty over samples comming from `b` is `max(g_a + delta_a) =
|
||||
// floor(2 * eps_a * n_a)`. Likewise, additional uncertainty over samples from `a` is
|
||||
// `floor(2 * eps_b * n_b)`.
|
||||
// Only samples that interleave the other side are affected. That means that samples from
|
||||
// one side that are lesser (or greater) than all samples from the other side are just copied
|
||||
// unmodifed.
|
||||
// If the merging instances have different `relativeError`, the resulting instance will cary
|
||||
// the largest one: `eps_ab = max(eps_a, eps_b)`.
|
||||
// The main invariant of the GK algorithm is kept:
|
||||
// `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since
|
||||
// `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)`
|
||||
// Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])`
|
||||
|
||||
val mergedSampled = new ArrayBuffer[Stats]()
|
||||
val mergedRelativeError = math.max(relativeError, other.relativeError)
|
||||
val mergedCount = count + other.count
|
||||
val additionalSelfDelta = math.floor(2 * other.relativeError * other.count).toLong
|
||||
val additionalOtherDelta = math.floor(2 * relativeError * count).toLong
|
||||
|
||||
// Do a merge of two sorted lists until one of the lists is fully consumed
|
||||
var selfIdx = 0
|
||||
var otherIdx = 0
|
||||
while (selfIdx < sampled.length && otherIdx < other.sampled.length) {
|
||||
val selfSample = sampled(selfIdx)
|
||||
val otherSample = other.sampled(otherIdx)
|
||||
|
||||
// Detect next sample
|
||||
val (nextSample, additionalDelta) = if (selfSample.value < otherSample.value) {
|
||||
selfIdx += 1
|
||||
(selfSample, if (otherIdx > 0) additionalSelfDelta else 0)
|
||||
} else {
|
||||
otherIdx += 1
|
||||
(otherSample, if (selfIdx > 0) additionalOtherDelta else 0)
|
||||
}
|
||||
|
||||
// Insert it
|
||||
mergedSampled += nextSample.copy(delta = nextSample.delta + additionalDelta)
|
||||
}
|
||||
|
||||
// Copy the remaining samples from the other list
|
||||
// (by construction, at most one `while` loop will run)
|
||||
while (selfIdx < sampled.length) {
|
||||
mergedSampled += sampled(selfIdx)
|
||||
selfIdx += 1
|
||||
}
|
||||
while (otherIdx < other.sampled.length) {
|
||||
mergedSampled += other.sampled(otherIdx)
|
||||
otherIdx += 1
|
||||
}
|
||||
|
||||
val comp = compressImmut(mergedSampled, 2 * mergedRelativeError * mergedCount)
|
||||
new QuantileSummaries(other.compressThreshold, mergedRelativeError, comp, mergedCount, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -169,5 +169,22 @@ class QuantileSummariesSuite extends SparkFunSuite {
|
|||
checkQuantile(0.1, data, s)
|
||||
checkQuantile(0.001, data, s)
|
||||
}
|
||||
|
||||
// length of data21 is 4 * length of data22
|
||||
val data21 = data.zipWithIndex.filter(_._2 % 5 != 0).map(_._1).toSeq
|
||||
val data22 = data.zipWithIndex.filter(_._2 % 5 == 0).map(_._1).toSeq
|
||||
|
||||
test(
|
||||
s"Merging unbalanced interleaved lists with epsi=$epsi and seq=$seq_name, " +
|
||||
s"compression=$compression") {
|
||||
val s1 = buildSummary(data21, epsi, compression)
|
||||
val s2 = buildSummary(data22, epsi, compression)
|
||||
val s = s1.merge(s2)
|
||||
// Check all quantiles
|
||||
for (queryRank <- 1 to n) {
|
||||
val queryQuantile = queryRank.toDouble / n.toDouble
|
||||
checkQuantile(queryQuantile, data, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,20 +124,24 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSparkSession
|
|||
test("percentile_approx, with different accuracies") {
|
||||
|
||||
withTempView(table) {
|
||||
(1 to 1000).toDF("col").createOrReplaceTempView(table)
|
||||
val tableCount = 1000
|
||||
(1 to tableCount).toDF("col").createOrReplaceTempView(table)
|
||||
|
||||
// With different accuracies
|
||||
val expectedPercentile = 250D
|
||||
val accuracies = Array(1, 10, 100, 1000, 10000)
|
||||
val errors = accuracies.map { accuracy =>
|
||||
val df = spark.sql(s"SELECT percentile_approx(col, 0.25, $accuracy) FROM $table")
|
||||
val approximatePercentile = df.collect().head.getInt(0)
|
||||
val error = Math.abs(approximatePercentile - expectedPercentile)
|
||||
error
|
||||
val expectedPercentiles = Array(100D, 200D, 250D, 314D, 777D)
|
||||
for (accuracy <- accuracies) {
|
||||
for (expectedPercentile <- expectedPercentiles) {
|
||||
val df = spark.sql(
|
||||
s"""SELECT
|
||||
| percentile_approx(col, $expectedPercentile/$tableCount, $accuracy)
|
||||
|FROM $table
|
||||
""".stripMargin)
|
||||
val approximatePercentile = df.collect().head.getInt(0)
|
||||
val error = Math.abs(approximatePercentile - expectedPercentile)
|
||||
assert(error <= math.floor(tableCount.toDouble / accuracy.toDouble))
|
||||
}
|
||||
}
|
||||
|
||||
// The larger accuracy value we use, the smaller error we get
|
||||
assert(errors.sorted.sameElements(errors.reverse))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue