[SPARK-17439][SQL] Fixing compression issues with approximate quantiles and adding more tests

## What changes were proposed in this pull request?

This PR build on #14976 and fixes a correctness bug that would cause the wrong quantile to be returned for small target errors.

## How was this patch tested?

This PR adds 8 unit tests that were failing without the fix.

Author: Timothy Hunter <timhunter@databricks.com>
Author: Sean Owen <sowen@cloudera.com>

Closes #15002 from thunterdb/ml-1783.
This commit is contained in:
Timothy Hunter 2016-09-11 08:03:45 +01:00 committed by Sean Owen
parent 29ba9578f4
commit 180796ecb3
2 changed files with 39 additions and 6 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.util
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
@ -61,7 +61,12 @@ class QuantileSummaries(
def insert(x: Double): QuantileSummaries = {
headSampled += x
if (headSampled.size >= defaultHeadSize) {
this.withHeadBufferInserted
val result = this.withHeadBufferInserted
if (result.sampled.length >= compressThreshold) {
result.compress()
} else {
result
}
} else {
this
}
@ -236,7 +241,7 @@ object QuantileSummaries {
if (currentSamples.isEmpty) {
return Array.empty[Stats]
}
val res: ArrayBuffer[Stats] = ArrayBuffer.empty
val res = ListBuffer.empty[Stats]
// Start for the last element, which is always part of the set.
// The head contains the current new head, that may be merged with the current element.
var head = currentSamples.last
@ -258,7 +263,10 @@ object QuantileSummaries {
}
res.prepend(head)
// If necessary, add the minimum element:
res.prepend(currentSamples.head)
val currHead = currentSamples.head
if (currHead.value < head.value) {
res.prepend(currentSamples.head)
}
res.toArray
}
}

View file

@ -40,6 +40,20 @@ class QuantileSummariesSuite extends SparkFunSuite {
summary.compress()
}
/**
* Interleaves compression and insertions.
*/
private def buildCompressSummary(
data: Seq[Double],
epsi: Double,
threshold: Int): QuantileSummaries = {
var summary = new QuantileSummaries(threshold, epsi)
data.foreach { x =>
summary = summary.insert(x).compress()
}
summary
}
private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
val approx = summary.query(quant)
// The rank of the approximation.
@ -54,8 +68,8 @@ class QuantileSummariesSuite extends SparkFunSuite {
for {
(seq_name, data) <- Seq(increasing, decreasing, random)
epsi <- Seq(0.1, 0.0001)
compression <- Seq(1000, 10)
epsi <- Seq(0.1, 0.0001) // With a significant value and with full precision
compression <- Seq(1000, 10) // This interleaves n so that we test without and with compression
} {
test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
@ -75,6 +89,17 @@ class QuantileSummariesSuite extends SparkFunSuite {
checkQuantile(0.1, data, s)
checkQuantile(0.001, data, s)
}
test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression " +
s"(interleaved)") {
val s = buildCompressSummary(data, epsi, compression)
assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
checkQuantile(0.9999, data, s)
checkQuantile(0.9, data, s)
checkQuantile(0.5, data, s)
checkQuantile(0.1, data, s)
checkQuantile(0.001, data, s)
}
}
// Tests for merging procedure