WIP: Multiple distribution types, logic over distributions

main
Oliver Kennedy 2024-01-28 01:48:20 -05:00
parent 68aa5fe448
commit be38cd2415
Signed by: okennedy
GPG Key ID: 3E5F9B3ABD3FDB60
18 changed files with 1002 additions and 442 deletions

View File

@ -18,10 +18,15 @@ object mimir_pip extends RootModule with ScalaModule {
def moduleDeps = Seq(lib)
def repositoriesTask = T.task {
super.repositoriesTask() ++
lib.repositoriesTask()
}
def repositoriesTask = T.task { super.repositoriesTask() ++ Seq(
MavenRepository("https://repo.osgeo.org/repository/release/"),
)}
def ivyDeps = Agg(
ivy"org.apache.sedona:sedona-common:1.5.0",
ivy"org.apache.sedona::sedona-spark-shaded-3.0:1.5.0",
ivy"org.datasyslab:geotools-wrapper:1.5.0-28.2"
)
def test = lib.test
@ -30,16 +35,11 @@ object mimir_pip extends RootModule with ScalaModule {
def scalaVersion = "2.12.15"
def repositoriesTask = T.task { super.repositoriesTask() ++ Seq(
MavenRepository("https://repo.osgeo.org/repository/release/"),
)}
def ivyDeps = Agg(
ivy"org.apache.spark::spark-sql:3.3.1",
ivy"org.apache.spark::spark-core:3.3.1",
ivy"org.apache.sedona:sedona-common:1.5.0",
ivy"org.apache.sedona::sedona-spark-shaded-3.0:1.5.0",
ivy"org.datasyslab:geotools-wrapper:1.5.0-28.2"
ivy"org.apache.commons:commons-math3:3.6.1"
)
object test extends ScalaTests with TestModule {

View File

@ -1,83 +0,0 @@
package org.mimirdb.pip
import scala.util.Random
import java.io.Serializable
/* Gaussian object NOTE: Scala uses Camel Case; create a new class file for this UDT and its associated methods */
final case class Gauss(mean: Float, sd: Float) {
def generate_value():Float = {
Random.nextGaussian().asInstanceOf[Float] * sd + mean
/*
I think this may be an approximation, i.e., not certain that
multiplying with sd and then taking the sum of the product and the mean is the exact computation
-you will probably need one more class to represent the random variable??
*/
}
def gaussPlus(operand: Gauss): Gauss = {
val new_mean = this.mean + operand.mean
val new_sd = math.sqrt(math.pow(this.sd, 2) + math.pow(operand.sd, 2))
Gauss(new_mean, new_sd.asInstanceOf[Float])
}
def gaussMinus(operand: Gauss): Gauss = {
this.gaussPlus(Gauss(operand.mean * (-1), operand.sd * (-1)))
}
def gaussMult(operand: Gauss): Gauss = {
/* https://ccrma.stanford.edu/~jos/sasp/Product_Two_Gaussian_PDFs.html
I am following the formula at the above link for taking the product of two Gaussians.
I don't know if this is correct based on other websites. For now, the above link is the formula
being followed.
*/
val new_mean = (this.mean * math.pow(operand.sd, 2) + operand.mean * math.pow(this.sd, 2)) / (math.pow(this.sd, 2) + math.pow(operand.sd, 2))
val new_sd = (math.pow(this.sd, 2) * math.pow(operand.sd, 2)) / (math.pow(this.sd, 2) + math.pow(operand.sd, 2))
Gauss(new_mean.asInstanceOf[Float], new_sd.asInstanceOf[Float])
}
override def toString(): String = {
s"Gauss(mean: ${mean}, stdiv: ${sd})"
}
/*
Comparison < prototype.
Original idea:
i) Take in a general parameter 'other'
ii) match it to its true type
iii) return the result of the comparison which could be an error
Difficulties:
*/
def <(other: Any): Boolean = other match {
case i: Integer => this < i.asInstanceOf[Float]
case f: Float => f match {
case y if (this.mean + (this.sd * 3) < y) => true //assumption here that 0.3% of the probability space will not matter
case _ => false
}
case d: Double => this < d.asInstanceOf[Float]
case g: Gauss => g match {
case y if (y.mean >= this.mean && y.sd >= this.sd) => false
case n if (n.mean < this.mean && n.sd < this.sd) => true
case _ => ???//here is where we make an estimate and give a confidence level
}
case _ => ???// this should raise an exception of incompatible types; how to do this in spark/scala ecosystem?
}
def >=(other: Any): Boolean = !(this < other)
def >(other: Any): Boolean = other match {
case i: Integer => this > i.asInstanceOf[Float]
case f: Float => f match {
case y if (this.mean + (3 * this.sd) > y) => true
case n if (this.mean + (3 * this.sd) <= n )=> false
case _ => ???//here is where we make an estimate and give a confidence level
}
case d: Double => this > d.asInstanceOf[Float]
case _ => ???//incompatible types exception
}
def <=(other: Any): Boolean = !(this > other)
}//this needs to be serializable; is this as simple as object Gauss extends serializable?
object Gauss extends Serializable

View File

@ -1,5 +0,0 @@
package org.mimirdb.pip
trait GaussDistribution {
}

View File

@ -1,60 +0,0 @@
package org.mimirdb.pip
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.hadoop.shaded.com.nimbusds.jose.util.StandardCharset
import com.fasterxml.jackson.module.scala.deser.overrides
class GaussType extends UserDefinedType[Gauss]{
/*
Guass is the user facing object being represented.
It is usually customary to define an object that inherits from Gauss.
The internal type spark already knows how to deal with. This is sqlType. It is the datatype that
spark uses for serialized instances.
*/
override def equals(other: Any): Boolean = other match {
case _: UserDefinedType[_] => other.isInstanceOf[GaussType]
case _ => false
}
override def sqlType: DataType = BinaryType
override def serialize(obj: Gauss): Array[Byte] =
{
val byteBuffer = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(byteBuffer)
out.writeObject(obj)
return byteBuffer.toByteArray()
}
/*
override def serialize(obj: Gauss): Array[Byte] = { obj match {
case Gauss(mean, sd) => s"Gauss(${mean}, ${sd})".getBytes("UTF-8")
case _ => s"ERROR".getBytes("UTF-8")
}
}
*/
/*
When spark itself is compiled, it doesn't know types until the query is run.
Deserialize only gets called on an object of type Array[Byte]. That is not information that is
available to spark's type checker. It is assumed that this object is then an arbitrary object type.
But you can then always cast it because spark will maintain the invariant that it will never call
deserialize on something it previously did not call to serialize
*/
override def deserialize(datum: Any): Gauss = {
/*
val s_datum = new String(datum.asInstanceOf[Array[Byte]], StandardCharset.UTF_8)
val first_split = s_datum.split("(")
val mean = first_split(1).split(",")(0).asInstanceOf[Float]
val sd = first_split(1).split(",")(1).trim.asInstanceOf[Float]
Gauss(mean, sd)
*/
val bis = new java.io.ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val obj = new java.io.ObjectInputStream(bis)
return obj.readObject().asInstanceOf[Gauss]
}
override def userClass: Class[Gauss] = classOf[Gauss]
}
case object GaussType extends org.mimirdb.pip.GaussType with Serializable

View File

@ -2,17 +2,33 @@ package org.mimirdb.pip
import org.apache.spark.sql.types.UDTRegistration
import org.apache.spark.sql.functions
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udaf
import distribution.DistributionFamily
object Mimir {
//by convention uppercase first letter of class/object name
//then just model this after the sedona registrator
/**
* Entry points for the Pip plugin
*/
object Pip {
def registerAll(): Unit = {
//val udt_class = "filler_class"
UDTRegistration.register(classOf[Gauss].getName(), classOf[GaussType].getName())
/**
* Initialize Pip with the current spark session
*/
def init(spark: SparkSession): Unit = {
UDTRegistration.register(classOf[udt.RandomVariable].getName(),
classOf[udt.RandomVariableType].getName())
UDTRegistration.register(classOf[udt.UnivariateDistribution].getName(),
classOf[udt.UnivariateDistributionType].getName())
spark.udf.register("gaussian", distribution.Gaussian.udf)
spark.udf.register("clamp", distribution.Clamp.udf)
spark.udf.register("discretize", distribution.Discretized.udf)
spark.udf.register("entropy", udf.Entropy.udf)
// Aggregates
spark.udf.register("uniform_mixture", distribution.NumericalMixture.uniform)
spark.udf.register("histogram", udaf(udf.Histogram))
}
}

View File

@ -1,5 +0,0 @@
package org.mimirdb.pip
final case class RandomVariable(i: ()=> Float){
}

View File

@ -0,0 +1,27 @@
package org.mimirdb.pip
import java.util.UUID
case class SampleParams(
uuid: UUID,
seed: Long
)
{
lazy val random =
new scala.util.Random(uuid.hashCode() ^ seed)
/**
* Resets the sampler for use with a fresh variable in the same world
*/
def forUUID(uuid: UUID) =
copy(uuid = uuid)
}
object SampleParams
{
def fullyRandom(): SampleParams =
SampleParams(
uuid = UUID.randomUUID(),
seed = scala.util.Random.nextLong
)
}

View File

@ -0,0 +1,127 @@
package org.mimirdb.pip.distribution
import scala.util.Random
import java.io.ObjectOutputStream
import java.io.ObjectInputStream
import org.mimirdb.pip.udt.UnivariateDistribution
import org.apache.spark.sql.functions
object Clamp
extends NumericalDistributionFamily
{
def apply(col: UnivariateDistribution, low: Double, high: Double): UnivariateDistribution =
UnivariateDistribution(this, Params(
family = col.family.asInstanceOf[NumericalDistributionFamily],
params = col.params,
low = Some(low),
high = Some(high)
))
def udf = functions.udf(apply(_, _, _))
case class Params(
family: NumericalDistributionFamily,
params: Any,
low: Option[Double],
high: Option[Double]
)
def describe(params: Any): String =
"Clamp at "+
params.asInstanceOf[Params].low.map { "["+_ }.getOrElse { "(∞" } + ", " +
params.asInstanceOf[Params].high.map { _.toString+"]" }.getOrElse { "∞)" } + " of " +
params.asInstanceOf[Params].family.describe(params.asInstanceOf[Params].params)
def min(params: Any): Double =
params.asInstanceOf[Params].low.map { Math.max(_,
params.asInstanceOf[Params].family.min(params.asInstanceOf[Params].params)
)}.getOrElse {
params.asInstanceOf[Params].family.min(params.asInstanceOf[Params].params)
}
def max(params: Any): Double =
params.asInstanceOf[Params].high.map { Math.min(_,
params.asInstanceOf[Params].family.min(params.asInstanceOf[Params].params)
)}.getOrElse {
params.asInstanceOf[Params].family.min(params.asInstanceOf[Params].params)
}
def sample(params: Any, random: Random): Any =
{
val child = params.asInstanceOf[Params]
child match {
case c:(CDFSupported with ICDFSupported) =>
{
val lowBound = child.low.map { c.cdf(_, child.params) }.getOrElse { 0.0 }
val highBound = child.high.map { c.cdf(_, child.params) }.getOrElse { 1.0 }
val samplePt = random.nextDouble() * (highBound - lowBound) + lowBound
c.icdf(samplePt, params)
}
case _ =>
{
child.family.rejectionSample(1000, child.params, random){
case v:Double =>
child.low.map { _ <= v }.getOrElse(true) &&
child.high.map { v <= _ }.getOrElse(true)
}.getOrElse {
throw new RuntimeException(s"Aborting sampling from rare event: ${child.family.describe(child.params)}")
}
}
}
}
def serialize(out: ObjectOutputStream, params: Any): Unit =
{
val child = params.asInstanceOf[Params]
out.writeDouble(child.low.getOrElse { Double.NaN })
out.writeDouble(child.high.getOrElse { Double.NaN })
out.writeUTF(child.family.label)
child.family.serialize(out, child.params)
}
def deserialize(out: ObjectInputStream): Any =
{
val low = out.readDouble() match {
case x if x.isNaN() => None
case x => Some(x)
}
val high = out.readDouble() match {
case x if x.isNaN() => None
case x => Some(x)
}
val family = DistributionFamily(out.readUTF()).asInstanceOf[NumericalDistributionFamily]
val params = family.deserialize(out)
Params(
family = family,
params = params,
low = low,
high = high,
)
}
override def approximateCDF(value: Double, params: Any, samples: Int): Double =
{
val child = params.asInstanceOf[Params]
if(child.family.approximateCDFIsFast(params))
{
val lowBound = child.low.map { child.family.approximateCDF(_, child.params, 1000) }.getOrElse { 0.0 }
val highBound = child.high.map { child.family.approximateCDF(_, child.params, 1000) }.getOrElse { 1.0 }
val actual = child.family.approximateCDF(value, child.params, 1000)
// println(s"CDF of $value @ Clamp Bounds: [${child.low} -> $lowBound, ${child.high} -> $highBound]: ${child.family.describe(child.params)}")
if(actual < lowBound){ return 0.0 }
if(actual > highBound){ return 1.0 }
return (actual - lowBound) / (highBound - lowBound)
} else {
super.approximateCDF(value, params, samples)
}
}
override def approximateCDFIsFast(params: Any): Boolean =
params.asInstanceOf[Params].family.approximateCDFIsFast(
params.asInstanceOf[Params].params
)
}

View File

@ -0,0 +1,154 @@
package org.mimirdb.pip.distribution
import scala.util.Random
import java.io.ObjectOutputStream
import java.io.ObjectInputStream
import org.mimirdb.pip.SampleParams
import org.mimirdb.pip.udt.UnivariateDistribution
import org.apache.spark.sql.functions
import scala.collection.Searching._
object Discretized
extends NumericalDistributionFamily
with CDFSupported
{
val ACCURACY = 0.0001
case class Bin(low: Double, high: Double, p: Double)
type Params = Seq[Bin]
def check(bins: Params): Params =
{
assert(!bins.isEmpty)
assert(
Math.abs(bins.map { _.p }.sum - 1.0) < ACCURACY,
s"Unexpected bin boundaries: ${bins.map { _.p }.sum} = ${bins.map { _.p }.mkString(" + ")}"
)
var curr = bins.head.high
for(x <- bins.tail)
{
assert(x.low < x.high)
assert(x.low == curr)
curr = x.high
}
return bins
}
def sample(params: Any, random: scala.util.Random): Double =
{
var x = random.nextDouble()
var bins = params.asInstanceOf[Params]
while(x > bins.head.p && bins.size > 1){
x -= bins.head.p
bins = bins.tail
}
x /= bins.head.p
return x * (bins.head.high - bins.head.low) + bins.head.low
}
def cdf(value: Double, params: Any): Double =
{
params.asInstanceOf[Params].map { bin =>
if(value >= bin.high){ bin.p }
else if(value >= bin.low){
bin.p * ((value - bin.low)/(bin.high - bin.low))
}
else { 0 }
}.sum
}
def serialize(out: ObjectOutputStream, params: Any): Unit =
{
val bins = params.asInstanceOf[Params]
out.writeInt(bins.size)
for(bin <- bins)
{
out.writeDouble(bin.low)
out.writeDouble(bin.high)
out.writeDouble(bin.p)
}
}
def deserialize(in: ObjectInputStream): Params =
{
val len = in.readInt()
check {
(0 until len).map { _ =>
val low = in.readDouble()
val high = in.readDouble()
val p = in.readDouble()
Bin(
low = low,
high = high,
p = p,
)
}.toSeq
}
}
def entropy(params: Any): Double =
params.asInstanceOf[Params].map { bin =>
if(bin.p > 0){ - Math.log(bin.p) * bin.p }
else { 0 }
}.sum
def min(params: Any): Double = params.asInstanceOf[Params].head.low
def max(params: Any): Double = params.asInstanceOf[Params].last.high
def describe(params: Any): String =
s"Discretized(${params.asInstanceOf[Params].map { b => s"[${b.low}, ${b.high}] -> ${b.p}" }.mkString(", ")})"
def apply(base: UnivariateDistribution, bins: Array[Double], samples: Int): UnivariateDistribution =
{
assert(bins.size >= 2)
val baseFamily = base.family.asInstanceOf[NumericalDistributionFamily]
val params:Params =
if(baseFamily.approximateCDFIsFast(base.params)){
val startCDF = baseFamily.approximateCDF(bins.head, base.params, 1000)
val endCDF = baseFamily.approximateCDF(bins.last, base.params, 1000)
val adjustCDF = endCDF - startCDF
var lastCDF = startCDF
var lastBin = bins.head
// println(s"Fast Path: $startCDF")
bins.tail.map { binHigh =>
val binLow = lastBin
var cdf = baseFamily.approximateCDF(binHigh, base.params, 1000)
val result = Bin(binLow, binHigh, (cdf - lastCDF) / adjustCDF)
lastCDF = cdf
lastBin = binHigh
result
}:Params
} else {
val counts = Array.fill(bins.size-1)(0)
var missed = 0
for(i <- 0 until samples)
{
val sample = base.family.sample(base, scala.util.Random).asInstanceOf[Double]
val bin = bins.search(sample)
if(bin.insertionPoint == 0 || bin.insertionPoint > bins.size){
missed += 1
} else {
counts(bin.insertionPoint - 1) += 1
}
}
counts.zipWithIndex.map { case (count, bin) =>
val binLow = bins(bin)
val binHigh = bins(bin+1)
val cdf = count.toDouble / (samples - missed)
Bin(binLow, binHigh, cdf)
}:Params
}
// println(bins.mkString(", "))
check(params)
UnivariateDistribution(
family = this,
params = params
)
}
def udf = functions.udf(apply(_, _, _))
}

View File

@ -0,0 +1,131 @@
package org.mimirdb.pip.distribution
import scala.util.Random
import java.io.Serializable
import java.io.ObjectOutputStream
import java.io.ObjectInputStream
import org.apache.commons.math3.special.Erf
import org.apache.spark.sql.Column
import org.mimirdb.pip.SampleParams
import org.mimirdb.pip.udt.UnivariateDistribution
/**
* The Gaussian (normal) distribution
*
*/
object Gaussian
extends NumericalDistributionFamily
with CDFSupported
with ICDFSupported
{
case class Params(mean: Double, sd: Double)
def sample(params: Any, random: scala.util.Random): Double =
{
random.nextGaussian() * params.asInstanceOf[Params].sd
+ params.asInstanceOf[Params].mean
}
def serialize(in: ObjectOutputStream, params: Any): Unit =
{
in.writeDouble(params.asInstanceOf[Params].mean)
in.writeDouble(params.asInstanceOf[Params].sd)
}
def deserialize(in: ObjectInputStream): Params =
{
return Params(
mean = in.readDouble(),
sd = in.readDouble()
)
}
def min(params: Any) = Double.MinValue
def max(params: Any) = Double.MaxValue
def cdf(value: Double, params: Any): Double =
(
1 + Erf.erf(
(value - params.asInstanceOf[Params].mean)
/ (params.asInstanceOf[Params].sd * Math.sqrt(2))
)
) / 2.0
def icdf(value: Double, params: Any): Double =
{
Erf.erfInv(value * 2 - 1) * params.asInstanceOf[Params].sd * Math.sqrt(2)
+ params.asInstanceOf[Params].mean
}
def apply(mean: Double, sd: Double): UnivariateDistribution =
UnivariateDistribution(this, Params(mean, sd))
def udf = org.apache.spark.sql.functions.udf(apply(_:Double, _:Double))
def apply(mean: Column, sd: Column): Column = udf(mean, sd)
def plus(a: Params, b: Params): Params =
{
val new_mean = a.mean + b.mean
val new_sd = math.sqrt(math.pow(a.sd, 2) + math.pow(b.sd, 2))
Params(new_mean, new_sd)
}
def minus(a: Params, b: Params): Params = {
plus(a, Params(b.mean * -1, b.sd * -1))
}
def mult(a: Params, b: Params): Params = {
/* https://ccrma.stanford.edu/~jos/sasp/Product_Two_Gaussian_PDFs.html
I am following the formula at the above link for taking the product of two Gaussians.
I don't know if this is correct based on other websites. For now, the above link is the formula
being followed.
*/
val new_mean = (a.mean * math.pow(b.sd, 2) + b.mean * math.pow(a.sd, 2)) / (math.pow(a.sd, 2) + math.pow(b.sd, 2))
val new_sd = (math.pow(a.sd, 2) * math.pow(b.sd, 2)) / (math.pow(a.sd, 2) + math.pow(b.sd, 2))
Params(new_mean, new_sd)
}
def describe(params: Any): String =
s"Gauss(mean: ${params.asInstanceOf[Params].mean}, std-dev: ${params.asInstanceOf[Params].sd})"
// /*
// Comparison < prototype.
// Original idea:
// i) Take in a general parameter 'other'
// ii) match it to its true type
// iii) return the result of the comparison which could be an error
// Difficulties:
// */
// def lt(self: Params, other: Any): Boolean = other match {
// case i: Integer => this < i.asInstanceOf[Float]
// case f: Float => f match {
// case y if (this.mean + (this.sd * 3) < y) => true //assumption here that 0.3% of the probability space will not matter
// case _ => false
// }
// case d: Double => this < d.asInstanceOf[Float]
// case g: Gauss => g match {
// case y if (y.mean >= this.mean && y.sd >= this.sd) => false
// case n if (n.mean < this.mean && n.sd < this.sd) => true
// case _ => ???//here is where we make an estimate and give a confidence level
// }
// case _ => ???// this should raise an exception of incompatible types; how to do this in spark/scala ecosystem?
// }
// def >=(other: Any): Boolean = !(this < other)
// def >(other: Any): Boolean = other match {
// case i: Integer => this > i.asInstanceOf[Float]
// case f: Float => f match {
// case y if (this.mean + (3 * this.sd) > y) => true
// case n if (this.mean + (3 * this.sd) <= n )=> false
// case _ => ???//here is where we make an estimate and give a confidence level
// }
// case d: Double => this > d.asInstanceOf[Float]
// case _ => ???//incompatible types exception
// }
// def <=(other: Any): Boolean = !(this > other)
}

View File

@ -0,0 +1,116 @@
package org.mimirdb.pip.distribution
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.udaf
import org.mimirdb.pip.udt.UnivariateDistribution
import java.util.UUID
import org.mimirdb.pip.SampleParams
object NumericalMixture
extends NumericalDistributionFamily
{
case class Child(family: NumericalDistributionFamily, params: Any, p: Double)
type Params = Seq[Child]
def sample(params: Any, random: scala.util.Random): Any =
{
var bins = params.asInstanceOf[Params]
var p = random.nextDouble()
while(bins.size > 1 && p > bins.head.p)
{
p -= bins.head.p
bins = bins.tail
}
bins.head
.family
.sample(bins.head.params, random)
}
def describe(params: Any): String =
{
"Mixture of "+params.asInstanceOf[Params].map { bin =>
s"${bin.family.describe(bin.params)}->${bin.p}"
}.mkString("; ")
}
def serialize(out: java.io.ObjectOutputStream, params: Any): Unit =
{
val bins = params.asInstanceOf[Params]
out.writeInt(bins.size)
for(bin <- bins)
{
out.writeDouble(bin.p)
out.writeUTF(bin.family.label)
bin.family.serialize(out, bin.params)
}
}
def deserialize(in: java.io.ObjectInputStream): Any =
{
val len = in.readInt()
(0 until len).map { _ =>
val p = in.readDouble()
val dist = DistributionFamily(in.readUTF()).asInstanceOf[NumericalDistributionFamily]
val params = dist.deserialize(in)
Child(
family = dist,
params = params,
p = p,
)
}
}
def min(params: Any) =
params.asInstanceOf[Params].map { bin =>
bin.family.min(bin.params)
}.min
def max(params: Any) =
params.asInstanceOf[Params].map { bin =>
bin.family.max(bin.params)
}.max
override def approximateCDF(value: Double, params: Any, samples: Int): Double =
{
params.asInstanceOf[Params].map { bin =>
bin.p * bin.family.approximateCDF(value, bin.params, samples)
}.sum
}
override def approximateCDFIsFast(params: Any): Boolean =
params.asInstanceOf[Params].forall { c => c.family.approximateCDFIsFast(c.params) }
object UniformAggregate extends Aggregator[UnivariateDistribution, List[UnivariateDistribution], UnivariateDistribution]
{
type T = List[UnivariateDistribution]
def zero: T = List()
def reduce(buffer: T, dataPoint: UnivariateDistribution): T =
dataPoint +: buffer
def merge(b1: T, b2: T): T =
b1 ++ b2
def finish(reduction: T): UnivariateDistribution =
{
val p = 1.0 / reduction.size
UnivariateDistribution(
family = NumericalMixture,
params =
reduction.map { dist =>
Child(dist.family.asInstanceOf[NumericalDistributionFamily], dist.params, p)
}
)
}
def bufferEncoder: Encoder[T] = ExpressionEncoder()
def outputEncoder: Encoder[UnivariateDistribution] = ExpressionEncoder()
}
val uniform = udaf(UniformAggregate)
}

View File

@ -0,0 +1,135 @@
package org.mimirdb.pip.distribution
import java.io.ObjectOutputStream
import scala.util.Random
import java.io.ObjectInputStream
import org.apache.spark.sql.types.{ DataType, DoubleType }
import scala.collection.mutable
import java.util.UUID
/**
* A random variable distribution.
*/
trait DistributionFamily
{
/**
* The underlying datatype generated by this distribution
*/
val baseType: DataType
/**
* Encode the params for an instance of this distribution
*/
def serialize(out: ObjectOutputStream, params: Any): Unit
/**
* Decode the params for an instance of this distribution
*/
def deserialize(in: ObjectInputStream): Any
/**
* Draw a sample from this distribution. You <b>must</b> generate random numbers based on the
* provided seed and uuid
*/
def sample(params: Any, random: scala.util.Random): Any
/**
* Draw samples until a criterion is met. Abort if no sample is reached in N steps
*/
def rejectionSample(maxSamples: Int, params: Any, random: scala.util.Random)
(criterion: Any => Boolean): Option[Any] =
{
for(i <- 0 until maxSamples){
val v = sample(params, random)
if(criterion(v)){ return Some(v) }
}
return None
}
/**
* Generate a summary of this distribution based on params
*/
def describe(params: Any): String
/**
* A unique label for this distribution
*/
def label = this.getClass.getSimpleName.toLowerCase
}
/**
* A [Distribution] that specifically samples numbers
*/
trait NumericalDistributionFamily extends DistributionFamily
{
val baseType = DoubleType
/**
* Compute the CDF
*/
def approximateCDF(value: Double, params: Any, samples: Int): Double =
this match {
case c:CDFSupported => c.cdf(value, params)
case _ =>
{
val rand = new scala.util.Random()
(0 until samples).count { _ =>
sample(params, rand).asInstanceOf[Double] <= value
}.toDouble / samples
}
}
def approximateCDFIsFast(params: Any): Boolean = this.isInstanceOf[CDFSupported]
def min(params: Any): Double
def max(params: Any): Double
}
/**
* An add-on to NumericalDistributionFamily that indicates an exact CDF can be computed
*/
trait CDFSupported
{
val baseType: DataType
assert(baseType == DoubleType, "Non-numerical distributions can not support CDFs")
def cdf(value: Double, params: Any): Double
}
/**
* An add-on to NumericalDistributionFamily that indicates an exact Inverse CDF can be computed
*/
trait ICDFSupported
{
val baseType: DataType
assert(baseType == DoubleType, "Non-numerical distributions can not support ICDFs")
def icdf(value: Double, params: Any): Double
}
/**
* Companion object for distributions: Keeps a registry of all known distributions
*/
object DistributionFamily
{
val registered = mutable.Map[String, DistributionFamily]()
def all: Iterable[DistributionFamily] = registered.values
def register(distribution: DistributionFamily): Unit =
registered.put(distribution.label, distribution)
def apply(distribution: String): DistributionFamily =
registered.get(distribution.toLowerCase)
.getOrElse {
throw new IllegalArgumentException(s"Invalid distribution family '$distribution': Available: ${registered.keys.mkString(", ")}")
}
/// Pre-defined distributions
register(Gaussian);
register(NumericalMixture);
register(Clamp)
register(Discretized)
}

View File

@ -0,0 +1,28 @@
package org.mimirdb.pip.udf
import org.mimirdb.pip.udt.UnivariateDistribution
import org.mimirdb.pip.distribution.Discretized
import org.mimirdb.pip.distribution.NumericalDistributionFamily
import org.apache.spark.sql.functions
object Entropy
{
val buckets = 1000
def apply(dist: UnivariateDistribution): Double =
{
dist.family match {
case Discretized =>
Discretized.entropy(dist.params)
case family:NumericalDistributionFamily =>
val min = family.min(dist.params)
val max = family.max(dist.params)
val step = (max - min) / buckets
val params = Discretized(dist, (min.until(max, step)).toArray, 1000).params
Discretized.entropy(params)
}
}
def udf = functions.udf(apply(_))
}

View File

@ -1,4 +1,4 @@
package org.mimirdb.pip
package org.mimirdb.pip.udf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

View File

@ -0,0 +1,60 @@
package org.mimirdb.pip.udt
import org.mimirdb.pip.distribution.DistributionFamily
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.hadoop.shaded.com.nimbusds.jose.util.StandardCharset
import com.fasterxml.jackson.module.scala.deser.overrides
import java.util.UUID
/**
* A random variable
*/
case class RandomVariable(distribution: DistributionFamily, params: Any, id: UUID = UUID.randomUUID())
{
}
/**
* A random variable
*/
class RandomVariableType extends UserDefinedType[RandomVariable]{
override def equals(other: Any): Boolean = other match {
case _: UserDefinedType[_] => other.isInstanceOf[RandomVariableType]
case _ => false
}
override def sqlType: DataType = BinaryType
override def serialize(obj: RandomVariable): Array[Byte] =
{
val byteBuffer = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(byteBuffer)
out.writeLong(obj.id.getMostSignificantBits())
out.writeLong(obj.id.getLeastSignificantBits())
out.writeUTF(obj.distribution.label)
obj.distribution.serialize(out, obj.params)
out.flush()
return byteBuffer.toByteArray()
}
override def deserialize(datum: Any): RandomVariable = {
val bis = new java.io.ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val in = new java.io.ObjectInputStream(bis)
val id = new UUID(in.readLong(), in.readLong())
val dist = DistributionFamily(in.readUTF())
val params = dist.deserialize(in)
return RandomVariable(
distribution = dist,
params = params,
id = id
)
}
override def userClass: Class[RandomVariable] = classOf[RandomVariable]
}
case object RandomVariableType extends org.mimirdb.pip.udt.RandomVariableType with Serializable

View File

@ -0,0 +1,47 @@
package org.mimirdb.pip.udt
import org.mimirdb.pip.distribution.DistributionFamily
import org.apache.spark.sql.types.{ DataType, UserDefinedType, BinaryType }
import java.util.UUID
case class UnivariateDistribution(family: DistributionFamily, params: Any)
{
override def toString(): String =
family.describe(params)
}
class UnivariateDistributionType extends UserDefinedType[UnivariateDistribution]
{
override def equals(other: Any): Boolean = other match {
case _: UserDefinedType[_] => other.isInstanceOf[UnivariateDistributionType]
case _ => false
}
override def sqlType: DataType = BinaryType
override def serialize(obj: UnivariateDistribution): Array[Byte] =
{
val byteBuffer = new java.io.ByteArrayOutputStream()
val out = new java.io.ObjectOutputStream(byteBuffer)
out.writeUTF(obj.family.label)
obj.family.serialize(out, obj.params)
out.flush()
return byteBuffer.toByteArray()
}
override def deserialize(datum: Any): UnivariateDistribution = {
val bis = new java.io.ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val in = new java.io.ObjectInputStream(bis)
val dist = DistributionFamily(in.readUTF())
val params = dist.deserialize(in)
return UnivariateDistribution(
family = dist,
params = params,
)
}
override def userClass: Class[UnivariateDistribution] = classOf[UnivariateDistribution]
}
case object UnivariateDistributionType extends org.mimirdb.pip.udt.UnivariateDistributionType

View File

@ -1,4 +1,4 @@
package org.mimirdb
package org.mimirdb.pip
package mimirdb
@ -9,6 +9,7 @@ import scala.util.Random
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import distribution._
/* Spark Session used across tests */
object DbServer {
@ -19,7 +20,7 @@ object DbServer {
spark.sparkContext.setLogLevel("WARN")
MimirUDTRegistrator.registerAll()
Pip.init(spark)
}
import DbServer._
@ -32,11 +33,13 @@ class CreateGaussObject extends AnyFlatSpec {
val sd_window = 3
val mean_window = 9
val dfRandData = df.select(rand() * mean_window as "sigma", rand() * sd_window as "mean")
val createGauss = udf((mu, sigma) => Gauss(mu, sigma))
val dfGaussObj = dfRandData.select(createGauss(dfRandData("mean"), dfRandData("sigma")) as "gObj")
val dfGaussObj =
dfRandData.select(
Gaussian(dfRandData("mean"), dfRandData("sigma")) as "gObj"
)
"A dataframe of Gauss objects" should "have values of type Gauss" in {
assert(dfGaussObj.schema("gObj").dataType == GaussType)
assert(dfGaussObj.schema("gObj").dataType == RandomVariableType)
}
}
@ -44,19 +47,22 @@ class CreateGaussObject extends AnyFlatSpec {
/* Adding Gauss objects */
class GaussPlusTests extends AnyFlatSpec {
"Adding two Gauss objects" should "return a correctly computed Gauss object" in {
val firstObj = Gauss(2.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
val secondObj = Gauss(1.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
assert(firstObj.gaussPlus(secondObj) === Gauss(3.0.asInstanceOf[Float], math.sqrt(2.0).asInstanceOf[Float]))
val firstParams = Gaussian.Params(2.0, 1.0)
val secondParams = Gaussian.Params(1.0, 1.0)
val sum = Gaussian.plus(firstParams, secondParams)
assert(sum.mean == 3.0)
assert(sum.sd == math.sqrt(2.0))
}
}
/* Subtracting Gauss objects */
class GaussMinusTests extends AnyFlatSpec {
"Subtracting two Gauss objects" should "return a correctly computed Gauss object" in {
val firstObj = Gauss(2.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
val secondObj = Gauss(1.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
assert(firstObj.gaussMinus(secondObj) === Gauss(1.0.asInstanceOf[Float], math.sqrt(2.0).asInstanceOf[Float]))
val firstParams = Gaussian.Params(2.0, 1.0)
val secondParams = Gaussian.Params(1.0, 1.0)
val diff = Gaussian.minus(firstParams, secondParams)
assert(diff.mean == 1.0)
assert(diff.sd == math.sqrt(2.0))
}
}
@ -67,12 +73,13 @@ class GaussMultTests extends AnyFlatSpec {
NOTE:
This code seems redundant. It is just performing the same computation as found in Gauss.gaussMult
*/
val firstObj = Gauss(2.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
val secondObj = Gauss(1.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
val divisor = math.pow(firstObj.sd, 2) + math.pow(secondObj.sd, 2)
val m_dividend = firstObj.mean * math.pow(secondObj.sd, 2) + secondObj.mean * math.pow(firstObj.sd, 2)
val s_dividend = math.pow(firstObj.sd, 2) * math.pow(secondObj.sd, 2)
assert(firstObj.gaussMult(secondObj) === Gauss((m_dividend/divisor).asInstanceOf[Float], (s_dividend/divisor).asInstanceOf[Float]))
val firstParams = Gaussian.Params(2.0, 1.0)
val secondParams = Gaussian.Params(1.0, 1.0)
val divisor = math.pow(firstParams.sd, 2) + math.pow(secondParams.sd, 2)
val m_dividend = firstParams.mean * math.pow(secondParams.sd, 2) +
secondParams.mean * math.pow(firstParams.sd, 2)
val s_dividend = math.pow(firstParams.sd, 2) * math.pow(secondParams.sd, 2)
assert(Gaussian.mult(firstParams,secondParams) === Gaussian.Params((m_dividend/divisor), (s_dividend/divisor)))
}
}
@ -80,8 +87,8 @@ class GaussMultTests extends AnyFlatSpec {
/* Converting Gauss object to string */
class GaussToStringTests extends AnyFlatSpec {
"Calling toString on Gauss object" should "produce a correct string representation" in {
val gaussObj = Gauss(2.0.asInstanceOf[Float], 1.0.asInstanceOf[Float])
assert(gaussObj.toString() === s"Gauss(mean: ${gaussObj.mean}, stdiv: ${gaussObj.sd})")
val gaussParams = Gaussian.Params(2.0, 1.0)
assert(Gaussian.describe(gaussParams) === s"Gauss(mean: ${gaussParams.mean}, std-dev: ${gaussParams.sd})")
}
}
@ -111,8 +118,9 @@ class HistAggTest extends AnyFlatSpec {
)
).toDF(idName, speedName, intvlName, probName)
// df.show()
"Calling histogram aggregate on dataframe" should "produce correct Array[Double] encoding of histogram" in {
spark.udf.register("histogram", functions.udaf(histogram))
df.createOrReplaceTempView("testData")
val out = spark.sql(s"SELECT histogram($idName, $speedName, $intvlName, $probName) AS hist FROM testData")

View File

@ -1,7 +1,6 @@
package org.mimirdb.pip
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.log4j.{ Logger, Level }
import org.apache.spark.sql.functions._
import scala.util.Random
@ -17,6 +16,7 @@ import java.nio.file.{Paths, Files}
/* For SedonaContext */
import org.apache.sedona.spark.SedonaContext
import org.apache.logging.log4j.core.tools.picocli.CommandLine.Help.Column
import org.mimirdb.pip.distribution._
object Main
{
@ -34,8 +34,9 @@ object Main
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setCheckpointDir("spark-warehouse")
Mimir.registerAll()
Pip.init(spark)
/*
Reproducing Vizier mars_rover workflow
@ -61,265 +62,128 @@ object Main
}
println("We did find the file, now reading.")
val sedona = SedonaContext.create(spark)
val marsDF = sedona.read.option("multiLine", true).json(fileData)
/*
I am not sure what's going on, but currently get an error saying that column features can't be found.
I switched to using sql source table marsDF, but this throws an undefined relation error.
I presume I am going to have to hit the docs when I pick up later on this. I suspect the cause is in
what spark does to create the view, and the schema is arranged differently. No problems in Vizier. Can use Vizier to
verify schema.
*/
println("Printing marsDF schema...\n------------------------------------")
marsDF.printSchema()
assert(SedonaContext.create(spark) eq spark)
var df = spark.read.option("multiLine", true).json(fileData)
/* Create temporary Spark view to query */
marsDF.createOrReplaceTempView("trips")
df.createOrReplaceTempView("trips")
/*
The following query throws an error: array(string) is not type string; leaving for future investigation
*/
//val sqlDFfilter = sedona.sql("SELECT features.type, features.properties.*, ST_GeomFromGeoJSON(to_json(features.geometry)) as geo " +
// "FROM (SELECT explode(features) AS features FROM trips WHERE features.geometry.type <> \"POINT\")")
//sqlDFfilter.show(false)
////////////////////////////////////////////////////////
// Extract GeoJSON and properties field from the data
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT features.type,
features.properties.*,
ST_GeomFromGeoJSON(to_json(features.geometry)) as geo
FROM (
SELECT explode(features) AS features FROM trips
)
""").coalesce(1)
// sqlDF.printSchema()
// df.show(false)
df.createOrReplaceTempView("traverse_data")
/*
Extract GeoJSON and properties field from the data
*/
val sqlDF = sedona.sql("SELECT features.type, features.properties.*, ST_GeomFromGeoJSON(to_json(features.geometry)) as geo " +
"FROM (SELECT explode(features) AS features FROM trips)")
println("\n\n\nPrinting over sql query over view trips...\n------------------------------")
////////////////////////////////////////////////////////
// Trip Times
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT *,
dist_km - lag(dist_km, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS km_traveled,
sol - lag(sol, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS sols_traveled
FROM traverse_data
WHERE dist_km > 0
""")
// df.show(false)
df.createOrReplaceTempView("traverse_data")
// spark.sql("""
// SELECT max(km_traveled * 1000 / sols_traveled) as m_per_sol
// FROM traverse_data
// WHERE sols_traveled > 0
// """).show()
// return
/*
Basic Sanity checks
*/
sqlDF.printSchema()
sqlDF.show(false)
/* The following breaks. Not sure why. Leaving for future reference */
//sqlDF.filter($"type" <> "POINT") why doesn't this work?; the docs say it should
/*
Count waypoints per site
*/
sqlDF.createOrReplaceTempView("traverse_data")
val wpSite = sedona.sql("SELECT site, COUNT(*) AS num_waypoints, SUM(dist_m) " +
"FROM traverse_data GROUP BY site ORDER BY site;")
/* Output */
wpSite.printSchema()
wpSite.show(false)
/*
Compute Bounding Box
NOTE: max_lat is not consistent with Vizier
*/
val boundBox = sedona.sql("SELECT min(lon) AS min_lon, min(lat) as min_lat, max(lon) AS max_lon, max(lat) AS max_lat " +
"FROM traverse_data;")
boundBox.printSchema()
boundBox.show(false)
/*
Compute Sedona bounding box
*/
boundBox.createOrReplaceTempView("bounding_box")
val sedBoundBox = sedona.sql("SELECT ST_PolygonFromEnvelope(min_lon, min_lat, max_lon, max_lat) FROM bounding_box;")
sedBoundBox.printSchema()
sedBoundBox.show(false)
/*
Trip Distances
*/
val tripDist = sedona.sql("SELECT dist_total_m AS cum_dist_m, dist_m, lon, lat FROM traverse_data;")
tripDist.printSchema()
tripDist.show(false)
////////////////////////////////////////////////////////
// Trip Distances
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT ST_Point(
CAST(lon as decimal(24,20)),
CAST(lat as decimal(24,20))
) as geometry,
clamp(gaussian(cast(km_traveled as double) * 1000 / sols_traveled, 3.0), 0.0, 800) as m_per_sol
FROM traverse_data
WHERE sols_traveled > 0
""")//.checkpoint()
// tripDist.printSchema()
// df.show(false)
df.createOrReplaceTempView("trip_points")
/*
Uncertainty in time metric
*/
val sameDayPairs = sedona.sql("SELECT t1.id, t2.id2, t1.sol, t2.sol " +
"FROM (SELECT monotonically_increasing_id() AS id, * FROM traverse_data) t1, " +
"(SELECT monotonically_increasing_id() AS id2, sol FROM traverse_data) t2 " +
"WHERE t1.sol = t2.sol AND t1.id < t2.id2;")
sameDayPairs.printSchema()
sameDayPairs.show(false)
////////////////////////////////////////////////////////
// Bounding Box
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT min(lat) as min_lat,
max(lat) as max_lat,
min(lon) as min_lon,
max(lon) as max_lon
FROM traverse_data
""")
// df.show(false)
df.createOrReplaceTempView("mission_region")
/*
Uncertainty in time metric (as a count query)
*/
val sameDayCount = sedona.sql("SELECT sol, COUNT(*) as num_trips FROM traverse_data GROUP BY sol HAVING num_trips > 1 " +
"ORDER BY sol;")
sameDayCount.printSchema()
sameDayCount.show(false)
////////////////////////////////////////////////////////
// Example Histogram Regions
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT id, ST_PolygonFromEnvelope(lon_low, lat_low, lon_high, lat_high) as geometry
FROM (
SELECT
10 * lon_idx + lat_idx AS id,
(max_lat - min_lat)/10 * lat_idx + min_lat AS lat_low,
(max_lat - min_lat)/10 * (lat_idx+1) + min_lat AS lat_high,
(max_lon - min_lon)/10 * lon_idx + min_lon AS lon_low,
(max_lon - min_lon)/10 * (lon_idx+1) + min_lon AS lon_high
FROM (SELECT id AS lon_idx from range(0,10)) AS lon_split,
(SELECT id AS lat_idx from range(0,10)) AS lat_split,
mission_region
)
""")
// df.show(false)
df.createOrReplaceTempView("bounding_boxes")
/*
(Synthetic) interval breakdown
*/
val synIntBreak = sedona.sql("SELECT site, COUNT(CASE WHEN drive >= 0 AND drive < 500 THEN 1 END) AS 0_to_500, " +
"COUNT(CASE WHEN drive >= 500 AND drive < 1000 THEN 1 END) AS 500_to_1000, " +
"COUNT(CASE WHEN drive >= 1000 AND drive < 1500 THEN 1 END) AS 1000_to_1500, " +
"COUNT(CASE WHEN drive >= 1500 AND drive < 2000 THEN 1 END) AS 1500_to_2000, " +
"COUNT(CASE WHEN drive >= 2000 AND drive < 2500 THEN 1 END) AS 2000_to_2500, " +
"COUNT(CASE WHEN drive >= 2500 AND drive < 3000 THEN 1 END) AS 2500_to_3000, " +
"COUNT(CASE WHEN drive >= 3000 AND drive < 3500 THEN 1 END) AS 3000_to_3500, " +
"COUNT(CASE WHEN drive >= 3500 AND drive < 4000 THEN 1 END) AS 3500_to_4000, " +
"COUNT(CASE WHEN drive >= 4000 AND drive < 4500 THEN 1 END) AS 4000_to_4500, " +
"COUNT(CASE WHEN drive >= 4500 AND drive < 5000 THEN 1 END) AS 4500_to_5000, " +
"COUNT(CASE WHEN drive >= 5000 AND drive < 5500 THEN 1 END) AS 5000_to_5500, " +
"COUNT(CASE WHEN drive >= 5500 AND drive < 6000 THEN 1 END) AS 5500_to_6000, " +
"COUNT(CASE WHEN drive >= 6000 AND drive < 6500 THEN 1 END) AS 6000_to_6500 " +
"FROM traverse_data GROUP BY site ORDER BY site")
synIntBreak.printSchema()
synIntBreak.show(false)
////////////////////////////////////////////////////////
// Per-region distributions
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT box.id,
array_agg(m_per_sol) as components,
discretize(
uniform_mixture(m_per_sol),
array(0.0, 40.0, 80.0, 120.0, 160.0, 200.0, 240.0, 280.0, 320.0, 360.0, 400.0),
1000
) as m_per_sol
FROM trip_points point,
bounding_boxes box
WHERE ST_Contains(box.geometry, point.geometry)
GROUP BY box.id
""")
// df.show(false)
df.createOrReplaceTempView("grid_squares")
/*
Sanity Check on distance metrics using lat/lon
*/
val latLonDist = sedona.sql("SELECT sol, dist_km, dist_gc2, dist_gc1, dist_km / dist_gc2 as ratio " +
"FROM (SELECT sol, dist_m/1000 as dist_km, " +
"acos(sin(lat1)*sin(lat2)+cos(lat1)*cos(lat2)*cos(lon1-lon2)) * 3389.5 as dist_gc1, " +
"2 * (asin(sqrt(pow(sin( (lat1-lat2)/2 ), 2) + cos(lat1) * cos(lat2) * pow(sin( (lon1-lon2)/2 ), 2) " +
"))) * 3389.5 AS dist_gc2 FROM (SELECT sol, lat * 3.14159265 / 180 as lat2, lon * 3.14159265 / 180 as lon2, " +
"dist_m, lag(lat * 3.14159265 / 180, 1) OVER w as lat1, lag(lon * 3.14159265 / 180, 1) OVER w as lon1 " +
"FROM traverse_data WHERE final='y' WINDOW w as (ORDER BY sol)))")
latLonDist.printSchema()
latLonDist.show(false)
/*
Now, insert uncertainty via Gauss objects.
Let's define the mean to be halfway between the two observations (dist_km, dist_gc2).
Further let the standard deviation be the distance between the mean and an observation.
*/
val create_gauss = udf((mu:Float, sigma:Float) => Gauss(mu, sigma))
//this is a function that takes 2 Column objects and returns a Column object
/*
Technically, the math here is incorrect as we need to know which values are greater and lesser.
Nonetheless, good for demonstration purposes.
*/
val uncertGauss = latLonDist.select(latLonDist("*"), create_gauss(latLonDist("dist_km") - latLonDist("dist_gc2") +
latLonDist("dist_gc2"), latLonDist("dist_km") - latLonDist("dist_gc2")) as "dist_uncert")
uncertGauss.show(false)
/* Projection*/
uncertGauss.select(uncertGauss("dist_uncert")).show(false)
/* Selection
NOTE: breaks
*/
/* Class method of column that takes in one or more parameter; if it's not a column, spark is trying to wrap
it in a literal but it doesn't know whether this is a literal (spark class representing constant values).
The easy way is call create_gauss.
*/
/* Try things and let them break where they may. Think about what should happen in a given comparison.
YOu can just define a funcdtion in your case class to code the semantics of the operator, i.e. disjoint domains
of the distributions. One thing that would be helpful: remind yourself how this is handled in other (probabilistic) systems:
maybms, pip, mcdb e.g. meet Oliver @ 1:30pm tomorrow */
/*
the predicate uncertGauss("dist_uncert") < create_gauss(3.0.asInstanceOf[Float], 5.0.asInstanceOf[Float] doesn't work;
I am not sure why, except that is doesn't evaluate as a column
//uncertGauss.filter(uncertGauss("dist_uncert") < create_gauss(3.0.asInstanceOf[Float], 5.0.asInstanceOf[Float])).show(false)
*/
/*
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '(dist_uncert < 1.0D)' due to data type mismatch:
differing types in '(dist_uncert < 1.0D)' (binary and double). Weird...binary?
Note: even after writing an operator < function in Gauss.scala, still the same error. This must be because I have not told
spark where to look for the comparison operator. How do I do that? Do I need to modify Expression/Column.scala?
*/
uncertGauss.filter(uncertGauss("dist_uncert") < 1.0).show(false)
////////////////////////////////////////////////////////
// Per-region metrics
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT id,
m_per_sol,
entropy(m_per_sol) as entropy
-- components
FROM grid_squares
-- LIMIT 1
""")
df.show(false)
}
}
/*
Old test code. Leaving for reference.
*/
/*
//you could alias spark.range: import spark.range; then line 21 becomes RHS range(10)
//This yields a dataframe that has one column with 10 rows, the set of natural numbers [1:10]
val df = spark.range(10)
//val col_n_dist = randn()
/*
val df_rand = df.select(rand().as("Random Data"))
val df_rand1 = df.select(rand() as "More Random Data")
*/
val sd_window = 3
val mean_window = 9
/*
val df_rand_sd = df.select(rand() * sd_window as "Standard Deviation")
val df_rand_mean = df.select((rand() * 9) + 1 as "Mean")
*/
/* Create df of sd/mean measurements */
val df_mean_sd = df.select(expr("id"), rand() * sd_window as "SD", (rand() * mean_window) + 1 as "Mean")
df_mean_sd.show()
/*
Old stuff to be deleted
var gauss_vals = List[Gauss]()
df_mean_sd.take(df_mean_sd.count.toInt).foreach(t => gauss_vals = gauss_vals :+ Gauss(t(1).asInstanceOf[Double].asInstanceOf[Float], t(2).asInstanceOf[Double].asInstanceOf[Float]))
df_mean_sd.take(df_mean_sd.count.toInt).foreach(t => println(s"SD=${t(1)},\tmean=${t(2)}"))
gauss_vals.foreach(o => println(o))
println(gauss_vals)
val test_gauss = Gauss(2, 3)
print(s"Testing: ${test_gauss}")
gauss_vals = gauss_vals :+ test_gauss
print(s"Testing list: ${gauss_vals}")
val rand_gauss = Random.nextGaussian().asInstanceOf[Float]
println(s"\nrandom value geneerated from gaussian distribution with mean 0 standard deviation 1: ${rand_gauss}")
//val class_name = "guassian"
//val class_type = "random"
*/
/* Create spark udf */
val create_gauss = udf((mu, sigma) => Gauss(mu, sigma))//this is a function that takes 2 Column objects and returns a Column object
//you can verify this by printing the type signature of create_gauss
//test .gaussPlus functionality
//think compositionally; break things down into composable units, i.e. use Guass object parameters
val add_gauss = udf((mu1: Float, sigma1: Float, mu2:Float, sigma2: Float) => Gauss(mu1, sigma1).gaussPlus(Gauss(mu2, sigma2)))
//test .gaussMult functionality
val prod_gauss = udf((mu1: Float, sigma1: Float, mu2: Float, sigma2: Float) => Gauss(mu1, sigma1).gaussMult(Gauss(mu2, sigma2)))
spark.udf.register("create_gauss", create_gauss)
//if we are going to keep add_gauss around, we need to register it
//spark.udf.register("add_gauss", add_gauss)
df_mean_sd.select(df_mean_sd("id"), df_mean_sd("Mean"), df_mean_sd("SD"), create_gauss(df_mean_sd("Mean"), df_mean_sd("SD"))).show()
df_mean_sd.select(df_mean_sd("*"), create_gauss(df_mean_sd("Mean"), df_mean_sd("SD"))).show(false)
df_mean_sd.select(df_mean_sd("*"), create_gauss(df_mean_sd("Mean"), df_mean_sd("SD")), add_gauss(df_mean_sd("Mean"), df_mean_sd("SD"), df_mean_sd("Mean"), df_mean_sd("SD")) as("Add Gauss to itself")).show(false)
df_mean_sd.select(df_mean_sd("*"), create_gauss(df_mean_sd("Mean"), df_mean_sd("SD")), prod_gauss(df_mean_sd("Mean"), df_mean_sd("SD"), df_mean_sd("Mean"), df_mean_sd("SD")) as("(Reflexive) Gauss Product")).show(false)
println(df_mean_sd.select(create_gauss(df_mean_sd("Mean"), df_mean_sd("SD")) as "gObj").schema("gObj").dataType == GaussType)
//false argument tells spark not to truncate data
*/