diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index 2a03f85ab5..3a0490d077 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -159,14 +159,72 @@ class QuantileSummaries( other.shallowCopy } else { // Merge the two buffers. - // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the - // statistics during the merging: the invariants are still respected after the merge. - // TODO: could replace full sort by ordered merge, the two lists are known to be sorted - // already. - val res = (sampled ++ other.sampled).sortBy(_.value) - val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count) - new QuantileSummaries( - other.compressThreshold, other.relativeError, comp, other.count + count, true) + // The GK algorithm is a bit unclear about it, but we need to adjust the statistics during the + // merging. The main idea is that samples that come from one side will suffer from the lack of + // precision of the other. + // As a concrete example, take two QuantileSummaries whose samples (value, g, delta) are: + // `a = [(0, 1, 0), (20, 99, 0)]` and `b = [(10, 1, 0), (30, 49, 0)]` + // This means `a` has 100 values, whose minimum is 0 and maximum is 20, + // while `b` has 50 values, between 10 and 30. + // The resulting samples of the merge will be: + // a+b = [(0, 1, 0), (10, 1, ??), (20, 99, ??), (30, 49, 0)] + // The values of `g` do not change, as they represent the minimum number of values between two + // consecutive samples. The values of `delta` should be adjusted, however. + // Take the case of the sample `10` from `b`. In the original stream, it could have appeared + // right after `0` (as expressed by `g=1`) or right before `20`, so `delta=99+0-1=98`. + // In the GK algorithm's style of working in terms of maximum bounds, one can observe that the + // maximum additional uncertainty over samples comming from `b` is `max(g_a + delta_a) = + // floor(2 * eps_a * n_a)`. Likewise, additional uncertainty over samples from `a` is + // `floor(2 * eps_b * n_b)`. + // Only samples that interleave the other side are affected. That means that samples from + // one side that are lesser (or greater) than all samples from the other side are just copied + // unmodifed. + // If the merging instances have different `relativeError`, the resulting instance will cary + // the largest one: `eps_ab = max(eps_a, eps_b)`. + // The main invariant of the GK algorithm is kept: + // `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since + // `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)` + // Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])` + + val mergedSampled = new ArrayBuffer[Stats]() + val mergedRelativeError = math.max(relativeError, other.relativeError) + val mergedCount = count + other.count + val additionalSelfDelta = math.floor(2 * other.relativeError * other.count).toLong + val additionalOtherDelta = math.floor(2 * relativeError * count).toLong + + // Do a merge of two sorted lists until one of the lists is fully consumed + var selfIdx = 0 + var otherIdx = 0 + while (selfIdx < sampled.length && otherIdx < other.sampled.length) { + val selfSample = sampled(selfIdx) + val otherSample = other.sampled(otherIdx) + + // Detect next sample + val (nextSample, additionalDelta) = if (selfSample.value < otherSample.value) { + selfIdx += 1 + (selfSample, if (otherIdx > 0) additionalSelfDelta else 0) + } else { + otherIdx += 1 + (otherSample, if (selfIdx > 0) additionalOtherDelta else 0) + } + + // Insert it + mergedSampled += nextSample.copy(delta = nextSample.delta + additionalDelta) + } + + // Copy the remaining samples from the other list + // (by construction, at most one `while` loop will run) + while (selfIdx < sampled.length) { + mergedSampled += sampled(selfIdx) + selfIdx += 1 + } + while (otherIdx < other.sampled.length) { + mergedSampled += other.sampled(otherIdx) + otherIdx += 1 + } + + val comp = compressImmut(mergedSampled, 2 * mergedRelativeError * mergedCount) + new QuantileSummaries(other.compressThreshold, mergedRelativeError, comp, mergedCount, true) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala index 650813975d..e53d0bbccc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala @@ -169,5 +169,22 @@ class QuantileSummariesSuite extends SparkFunSuite { checkQuantile(0.1, data, s) checkQuantile(0.001, data, s) } + + // length of data21 is 4 * length of data22 + val data21 = data.zipWithIndex.filter(_._2 % 5 != 0).map(_._1).toSeq + val data22 = data.zipWithIndex.filter(_._2 % 5 == 0).map(_._1).toSeq + + test( + s"Merging unbalanced interleaved lists with epsi=$epsi and seq=$seq_name, " + + s"compression=$compression") { + val s1 = buildSummary(data21, epsi, compression) + val s2 = buildSummary(data22, epsi, compression) + val s = s1.merge(s2) + // Check all quantiles + for (queryRank <- 1 to n) { + val queryQuantile = queryRank.toDouble / n.toDouble + checkQuantile(queryQuantile, data, s) + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala index a4b142b7ab..29766c4651 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala @@ -124,20 +124,24 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSparkSession test("percentile_approx, with different accuracies") { withTempView(table) { - (1 to 1000).toDF("col").createOrReplaceTempView(table) + val tableCount = 1000 + (1 to tableCount).toDF("col").createOrReplaceTempView(table) // With different accuracies - val expectedPercentile = 250D val accuracies = Array(1, 10, 100, 1000, 10000) - val errors = accuracies.map { accuracy => - val df = spark.sql(s"SELECT percentile_approx(col, 0.25, $accuracy) FROM $table") - val approximatePercentile = df.collect().head.getInt(0) - val error = Math.abs(approximatePercentile - expectedPercentile) - error + val expectedPercentiles = Array(100D, 200D, 250D, 314D, 777D) + for (accuracy <- accuracies) { + for (expectedPercentile <- expectedPercentiles) { + val df = spark.sql( + s"""SELECT + | percentile_approx(col, $expectedPercentile/$tableCount, $accuracy) + |FROM $table + """.stripMargin) + val approximatePercentile = df.collect().head.getInt(0) + val error = Math.abs(approximatePercentile - expectedPercentile) + assert(error <= math.floor(tableCount.toDouble / accuracy.toDouble)) + } } - - // The larger accuracy value we use, the smaller error we get - assert(errors.sorted.sameElements(errors.reverse)) } }