Improved support for histograms

spatial-index
Oliver Kennedy 2024-04-27 22:14:56 -04:00
parent f278396c78
commit 95c1c7bd49
Signed by: okennedy
GPG Key ID: 3E5F9B3ABD3FDB60
16 changed files with 270 additions and 31 deletions

View File

@ -33,9 +33,17 @@ object Pip {
registerFunction("num_const", distribution.numerical.ConstantNumber.Constructor(_))
registerFunction("clamp", distribution.numerical.Clamp.Constructor)
registerFunction("discretize", distribution.numerical.Discretized.Constructor)
registerFunction("dist_between", distribution.boolean.Between.Constructor)
spark.udf.register("entropy", udf.Entropy.udf)
spark.udf.register("kl_divergence", udf.KLDivergence.udf)
spark.udf.register("pip_min", udf.pip_min.udf)
spark.udf.register("pip_max", udf.pip_max.udf)
spark.udf.register("pip_p_between", udf.pip_p_between.udf)
spark.udf.register("pip_histogram", udf.pip_histogram.udf)
spark.udf.register("pip_export", udf.Export.udf)
spark.udf.register("pip_hc_bottom_up", lib.HierarchicalClustering.bottomUpUdf.udf)
spark.udf.register("pip_hc_list_thresholds", lib.HierarchicalClustering.listThresholdsUdf.udf)
spark.udf.register("pip_hc_extract_clusters", lib.HierarchicalClustering.extractClustersUdf.udf)
// Aggregates
spark.udf.register("uniform_mixture", distribution.numerical.NumericalMixture.uniform)

View File

@ -104,20 +104,25 @@ object Clamp
)
}
override def approximateCDF(value: Double, params: Any, samples: Int): Double =
override def approximateCDF(value: Double, params: Any, samples: Int, leadingEdge: Boolean = false): Double =
{
val child = params.asInstanceOf[Params]
if(child.family.approximateCDFIsFast(params))
{
val lowBound = child.low.map { child.family.approximateCDF(_, child.params, 1000) }.getOrElse { 0.0 }
val highBound = child.high.map { child.family.approximateCDF(_, child.params, 1000) }.getOrElse { 1.0 }
val actual = child.family.approximateCDF(value, child.params, 1000)
val actual = child.family.approximateCDF(value, child.params, 1000, leadingEdge)
// println(s"CDF of $value @ Clamp Bounds: [${child.low} -> $lowBound, ${child.high} -> $highBound]: ${child.family.describe(child.params)}")
if(actual < lowBound){ return 0.0 }
if(actual > highBound){ return 1.0 }
if(leadingEdge){
if(actual <= lowBound){ return 0.0 }
if(actual > highBound){ return 1.0 }
} else {
if(actual < lowBound){ return 0.0 }
if(actual >= highBound){ return 1.0 }
}
return (actual - lowBound) / (highBound - lowBound)
} else {
super.approximateCDF(value, params, samples)
super.approximateCDF(value, params, samples, leadingEdge)
}
}
override def approximateCDFIsFast(params: Any): Boolean =

View File

@ -36,11 +36,12 @@ object ConstantNumber
def min(params: Any) = params.asInstanceOf[Double]
def max(params: Any) = params.asInstanceOf[Double]
def cdf(value: Double, params: Any): Double =
def cdf(value: Double, params: Any, leadingEdge: Boolean = false): Double =
{
val p = params.asInstanceOf[Double]
if(value < p) { 0.0 }
else { 1.0 }
if(leadingEdge && value <= p) { 0.0 }
else if(value < p) { 0.0 }
else { 1.0 }
}
def icdf(value: Double, params: Any): Double =

View File

@ -49,7 +49,7 @@ object Discretized
return x * (bins.head.high - bins.head.low) + bins.head.low
}
def cdf(value: Double, params: Any): Double =
def cdf(value: Double, params: Any, leadingEdge: Boolean = false): Double =
{
params.asInstanceOf[Params].map { bin =>
if(value >= bin.high){ bin.p }
@ -136,28 +136,32 @@ object Discretized
val params:Params =
if(baseFamily.approximateCDFIsFast(base.params)){
val startCDF = baseFamily.approximateCDF(bins.head, base.params, 1000)
val endCDF = baseFamily.approximateCDF(bins.last, base.params, 1000)
val startCDF = baseFamily.approximateCDF(bins.head, base.params, 1000, leadingEdge = true)
val endCDF = baseFamily.approximateCDF(bins.last, base.params, 1000, leadingEdge = false)
val adjustCDF = endCDF - startCDF
var lastCDF = startCDF
var lastBin = bins.head
assert(adjustCDF > 0, s"Error histogramming $base Using CDF [${bins.head} - ${bins.last}]: $startCDF; $endCDF; $adjustCDF")
// println(s"Fast Path: $startCDF")
bins.tail.map { binHigh =>
val binLow = lastBin
var cdf = baseFamily.approximateCDF(binHigh, base.params, 1000)
var cdf = baseFamily.approximateCDF(binHigh, base.params, 1000, leadingEdge = false)
val result = Bin(binLow, binHigh, (cdf - lastCDF) / adjustCDF)
lastCDF = cdf
lastCDF = baseFamily.approximateCDF(binHigh, base.params, 1000, leadingEdge = true)
lastBin = binHigh
result
}:Params
} else {
// println(s"For $base, sampling histogram")
val counts = Array.fill(bins.size-1)(0)
var missed = 0
for(i <- 0 until samples)
{
val sample = base.family.sample(base, scala.util.Random).asInstanceOf[Double]
val bin = bins.search(sample)
// println(s"Sample: $sample")
if(bin.insertionPoint == 0 || bin.insertionPoint > bins.size){
// println(s" MISSED")
missed += 1
} else {
counts(bin.insertionPoint - 1) += 1

View File

@ -54,7 +54,7 @@ object Gaussian
def min(params: Any) = Double.NegativeInfinity
def max(params: Any) = Double.PositiveInfinity
def cdf(value: Double, params: Any): Double =
def cdf(value: Double, params: Any, leadingEdge: Boolean = false): Double =
(
1 + Erf.erf(
(value - params.asInstanceOf[Params].mean)

View File

@ -73,10 +73,10 @@ object NumericalMixture
bin.family.max(bin.params)
}.max
override def approximateCDF(value: Double, params: Any, samples: Int): Double =
override def approximateCDF(value: Double, params: Any, samples: Int, leadingEdge: Boolean = false): Double =
{
params.asInstanceOf[Params].map { bin =>
bin.p * bin.family.approximateCDF(value, bin.params, samples)
bin.p * bin.family.approximateCDF(value, bin.params, samples, leadingEdge)
}.sum
}

View File

@ -51,12 +51,13 @@ object Uniform
def min(params: Any) = params.asInstanceOf[Params].min
def max(params: Any) = params.asInstanceOf[Params].max
def cdf(value: Double, params: Any): Double =
def cdf(value: Double, params: Any, leadingEdge: Boolean = false): Double =
{
val p = params.asInstanceOf[Params]
if(value < p.min) { 0.0 }
else if(value >= p.max) { 1.0 }
else { (value - p.min) / p.width }
if(!leadingEdge && value <= p.min) { 0.0 }
else if(value < p.min) { 0.0 }
else if(value > p.max) { 1.0 }
else { (value - p.min) / p.width }
}
def icdf(value: Double, params: Any): Double =

View File

@ -13,14 +13,18 @@ trait NumericalDistributionFamily extends DistributionFamily
/**
* Compute the CDF
*/
def approximateCDF(value: Double, params: Any, samples: Int): Double =
def approximateCDF(value: Double, params: Any, samples: Int, leadingEdge: Boolean = false): Double =
this match {
case c:CDFSupported => c.cdf(value, params)
case _ =>
{
val rand = new scala.util.Random()
(0 until samples).count { _ =>
sample(params, rand).asInstanceOf[Double] <= value
if(leadingEdge){
sample(params, rand).asInstanceOf[Double] < value
} else {
sample(params, rand).asInstanceOf[Double] <= value
}
}.toDouble / samples
}
}
@ -39,7 +43,7 @@ trait CDFSupported
assert(baseType == DoubleType, "Non-numerical distributions can not support CDFs")
def cdf(value: Double, params: Any): Double
def cdf(value: Double, params: Any, leadingEdge: Boolean = false): Double
}
/**

View File

@ -13,6 +13,13 @@ class DistSummary(dimensions: Seq[DistSummary.Feature], root: DistSummary.Node)
{
root.insert(address, dist)
}
def regions(): Seq[Array[DistSummary.Feature]] =
{
val buffer = mutable.Buffer[Array[DistSummary.Feature]]()
root.regions(dimensions, buffer)
buffer.toSeq
}
}
object DistSummary
@ -20,6 +27,7 @@ object DistSummary
sealed trait Node
{
def insert(address: Array[Double], dist: UnivariateDistribution): Unit
def regions(dimensions: Seq[DistSummary.Feature], buffer: mutable.Buffer[Array[DistSummary.Feature]]): Unit
}
case class InnerNode(split: Split, left: Node, right: Node) extends Node
@ -35,11 +43,37 @@ object DistSummary
case c:CategoricalSplit =>
???
}
def regions(dimensions: Seq[DistSummary.Feature], buffer: mutable.Buffer[Array[DistSummary.Feature]]): Unit =
{
split match {
case c:ContinuousSplit =>
val (leftFeat, rightFeat) = dimensions(c.featureIndex)
.asInstanceOf[ContinuousFeature]
.split(c.threshold)
val before = dimensions.take(c.featureIndex)
val after = dimensions.drop(c.featureIndex+1)
left.regions(
before ++ Seq(leftFeat) ++ after,
buffer
)
right.regions(
before ++ Seq(rightFeat) ++ after,
buffer
)
case c:CategoricalSplit =>
???
}
}
}
case class LeafNode(distributions: mutable.ArrayBuffer[UnivariateDistribution] = mutable.ArrayBuffer.empty) extends Node
{
def insert(address: Array[Double], dist: UnivariateDistribution): Unit =
distributions.append(dist)
def regions(dimensions: Seq[DistSummary.Feature], buffer: mutable.Buffer[Array[DistSummary.Feature]]): Unit =
{
buffer.append(dimensions.toArray)
}
}
trait Feature

View File

@ -11,11 +11,9 @@ trait Distance[I]
object Distance
{
trait Manhattan extends Distance[Double]
case class Manhattan(dimensions: Int, min: Double, max: Double) extends Distance[Double]
{
val dimensions: Int
val min: Double
val max: Double
def pointToPlane(a: Array[Double], b: Double, dim: Int): Double =
{
Math.abs(a(dim) - b)

View File

@ -5,6 +5,8 @@ import scala.reflect.ClassTag
import scala.collection.mutable.PriorityQueue
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Stack
import org.apache.spark.sql.functions
import org.mimirdb.pip.util.ByteArrayUtils
object HierarchicalClustering
{
@ -230,7 +232,7 @@ object HierarchicalClustering
}
}
sealed trait Cluster[I, V]
sealed trait Cluster[I, V] extends Serializable
{
def radius: Double
def render(firstPrefix: String, restPrefix: String): String
@ -280,4 +282,102 @@ object HierarchicalClustering
}
def encode[I,V](cluster: Cluster[I,V]): Array[Byte] =
{
ByteArrayUtils.encode { buf =>
val todos = mutable.Stack[Either[Cluster[I,V], Group[I,V]]](Left(cluster))
while(!todos.isEmpty)
{
todos.pop match {
case Left(v: Singleton[I,V]) =>
buf.writeInt(1)
buf.writeObject(v.position)
buf.writeObject(v.value)
case Left(v: Group[I,V]) =>
todos.push(Right(v))
todos.push(Left(v.left))
todos.push(Left(v.right))
case Right(v) =>
buf.writeInt(2)
buf.writeDouble(v.radius)
buf.writeInt(v.size)
}
}
buf.writeInt(3)
}
}
def decode[I,V](data: Array[Byte]): Cluster[I,V] =
{
ByteArrayUtils.decode(data) { buf =>
val decoded = mutable.Stack[Cluster[I,V]]()
var mode = buf.readInt()
while(mode != 3)
{
mode match {
case 1 => {
val position = buf.readObject().asInstanceOf[Array[I]]
val value = buf.readObject().asInstanceOf[V]
decoded.push(Singleton(position, value))
}
case 2 => {
val left = decoded.pop()
val right = decoded.pop()
val radius = buf.readDouble()
val size = buf.readInt()
decoded.push(Group(left, right, radius, size))
}
}
mode = buf.readInt()
}
decoded.pop()
}
}
object bottomUpUdf {
def apply(elements: (Array[(Array[Double], String)]), min: Double, max: Double, method: String): Array[Byte] =
encode(
HierarchicalClustering.bottomUp(elements,
method match {
case "manhattan" => Distance.Manhattan(
dimensions = elements.head._1.size,
min = min,
max = max,
)
}
)
)
def udf = functions.udf(apply(_, _, _, _))
}
object listThresholdsUdf {
def apply(clusters: Array[Byte]): Array[Double] =
decode[Array[Double],String](clusters)
.orderedIterator
.map { x => x.radius }
.takeWhile { _ > 0.0 }
.toArray
def udf = functions.udf(apply(_))
}
object extractClustersUdf {
def apply(clusters: Array[Byte], threshold: Double): Array[(String, Int)] =
decode[Array[Double],String](clusters)
.threshold(threshold)
.zipWithIndex
.flatMap { case (cluster, idx) =>
cluster.elements.map { element =>
(element.value, idx)
}
}
.toArray
def udf = functions.udf(apply(_, _))
}
}

View File

@ -7,7 +7,7 @@ import org.apache.spark.sql.functions
object KLDivergence
{
val BUCKETS = 1000
val BUCKETS = 100
def apply(target: UnivariateDistribution, base: UnivariateDistribution): Double =
{
@ -16,13 +16,46 @@ object KLDivergence
Discretized.klDivergence(target.params, base.params)
case (_:NumericalDistributionFamily, Discretized) =>
Discretized.klDivergence(
Discretized(target, Discretized.bins(base.params), 1000),
Discretized(target, Discretized.bins(base.params), BUCKETS),
base.params
)
case (Discretized, _:NumericalDistributionFamily) =>
Discretized.klDivergence(
target.params,
Discretized(base, Discretized.bins(target.params), 1000),
Discretized(base, Discretized.bins(target.params), BUCKETS),
)
case (targetFam:NumericalDistributionFamily, baseFam:NumericalDistributionFamily) =>
val min =
Math.max(
Double.MinValue,
Math.min(
targetFam.min(target.params),
baseFam.min(base.params),
)
)
val max =
{
val tmp =
Math.min(
Double.MaxValue,
Math.max(
targetFam.max(target.params),
baseFam.max(base.params),
)
)
if(min < tmp){ tmp }
else { min + 1.0 }
}
// println(s"BINS FROM $min - $max")
val bins =
(0 to BUCKETS).map { i =>
min + ((max - min) / BUCKETS) * i
}.toArray
Discretized.klDivergence(
Discretized(target, bins, BUCKETS),
Discretized(base, bins, BUCKETS),
)
}
}

View File

@ -36,4 +36,21 @@ object pip_p_between
}
def udf = functions.udf(apply(_, _, _))
}
object pip_histogram
{
def apply(low: Double, high: Double, buckets: Int, target: UnivariateDistribution): Array[Double] =
{
val bucketStep = (high - low) / (buckets)
(0 until buckets).map { idx =>
pip_p_between(
bucketStep * (idx + 0.0),
target,
bucketStep * (idx + 1.0),
)
}.toArray
}
def udf = functions.udf(apply(_, _, _, _))
}

View File

@ -0,0 +1,34 @@
package org.mimirdb.pip.util
import java.io.{
ByteArrayOutputStream,
ByteArrayInputStream,
ObjectOutputStream,
ObjectInputStream,
}
object ByteArrayUtils
{
def serialize(o: Any): Array[Byte] =
encode { (_: ObjectOutputStream).writeObject(o) }
def encode( op: ObjectOutputStream => Unit ): Array[Byte] =
{
val byteBuffer = new ByteArrayOutputStream()
val out = new ObjectOutputStream(byteBuffer)
op(out)
out.flush()
return byteBuffer.toByteArray()
}
def deserialize[T <: Serializable](data: Array[Byte]): T =
decode(data)((_:ObjectInputStream).readObject().asInstanceOf[T])
def decode[T](data: Array[Byte])(op: ObjectInputStream => T): T =
{
val bis = new ByteArrayInputStream(data)
val in = new ObjectInputStream(bis)
op(in)
}
}