Full demo should now be viable. Merging with Vizier tomorrow.

main
Oliver Kennedy 2024-01-28 02:17:32 -05:00
parent be38cd2415
commit 3c65411e05
Signed by: okennedy
GPG Key ID: 3E5F9B3ABD3FDB60
6 changed files with 70 additions and 5 deletions

View File

@ -25,6 +25,7 @@ object Pip {
spark.udf.register("clamp", distribution.Clamp.udf)
spark.udf.register("discretize", distribution.Discretized.udf)
spark.udf.register("entropy", udf.Entropy.udf)
spark.udf.register("kl_divergence", udf.KLDivergence.udf)
// Aggregates
spark.udf.register("uniform_mixture", distribution.NumericalMixture.uniform)

View File

@ -93,12 +93,39 @@ object Discretized
else { 0 }
}.sum
def klDivergence(target: Any, base: Any): Double =
{
assert(sameBins(target, base))
target.asInstanceOf[Params]
.zip(base.asInstanceOf[Params])
.map { case (t:Bin, b:Bin) =>
if(t.p > 0){
if(b.p > 0){
t.p * Math.log(t.p / b.p)
} else {
Double.PositiveInfinity
}
} else { 0 }
}.sum
}
def min(params: Any): Double = params.asInstanceOf[Params].head.low
def max(params: Any): Double = params.asInstanceOf[Params].last.high
def describe(params: Any): String =
s"Discretized(${params.asInstanceOf[Params].map { b => s"[${b.low}, ${b.high}] -> ${b.p}" }.mkString(", ")})"
def bins(params: Any): Array[Double] =
(params.asInstanceOf[Params].head.low +:
params.asInstanceOf[Params].map { _.high }
).toArray
def sameBins(paramsA: Any, paramsB: Any): Boolean =
{
(paramsA.asInstanceOf[Params].size == paramsB.asInstanceOf[Params].size) &&
paramsA.asInstanceOf[Params].zip(paramsB.asInstanceOf[Params])
.forall { case (a, b) => a.low == b.low && a.high == b.high }
}
def apply(base: UnivariateDistribution, bins: Array[Double], samples: Int): UnivariateDistribution =
{

View File

@ -40,8 +40,8 @@ object Gaussian
)
}
def min(params: Any) = Double.MinValue
def max(params: Any) = Double.MaxValue
def min(params: Any) = Double.NegativeInfinity
def max(params: Any) = Double.PositiveInfinity
def cdf(value: Double, params: Any): Double =
(

View File

@ -7,7 +7,7 @@ import org.apache.spark.sql.functions
object Entropy
{
val buckets = 1000
val BUCKETS = 1000
def apply(dist: UnivariateDistribution): Double =
{
@ -17,7 +17,7 @@ object Entropy
case family:NumericalDistributionFamily =>
val min = family.min(dist.params)
val max = family.max(dist.params)
val step = (max - min) / buckets
val step = (max - min) / BUCKETS
val params = Discretized(dist, (min.until(max, step)).toArray, 1000).params
Discretized.entropy(params)

View File

@ -0,0 +1,31 @@
package org.mimirdb.pip.udf
import org.mimirdb.pip.udt.UnivariateDistribution
import org.mimirdb.pip.distribution.Discretized
import org.mimirdb.pip.distribution.NumericalDistributionFamily
import org.apache.spark.sql.functions
object KLDivergence
{
val BUCKETS = 1000
def apply(target: UnivariateDistribution, base: UnivariateDistribution): Double =
{
(target.family, base.family) match {
case (Discretized, Discretized) if Discretized.sameBins(target.params, base.params) =>
Discretized.klDivergence(target.params, base.params)
case (_:NumericalDistributionFamily, Discretized) =>
Discretized.klDivergence(
Discretized(target, Discretized.bins(base.params), 1000).params,
base.params
)
case (Discretized, _:NumericalDistributionFamily) =>
Discretized.klDivergence(
target.params,
Discretized(base, Discretized.bins(target.params), 1000).params,
)
}
}
def udf = functions.udf(apply(_,_))
}

View File

@ -177,7 +177,13 @@ object Main
df = spark.sql("""
SELECT id,
m_per_sol,
entropy(m_per_sol) as entropy
entropy(m_per_sol) as entropy,
array_max(
transform(
components,
x -> kl_divergence(x, m_per_sol)
)
) as max_kl_div
-- components
FROM grid_squares
-- LIMIT 1