Added aggregator to produce histogram.

main
ahuber 2024-01-23 07:08:48 -05:00
parent abe79580b3
commit 62976e3871
98 changed files with 138 additions and 4 deletions

View File

@ -1,6 +1,6 @@
{
"javaSemanticDBVersion": "0.9.6",
"semanticDBVersion": "4.8.12",
"javaSemanticDBVersion": "0.9.8",
"semanticDBVersion": "4.8.14",
"supportedScalaVersions": [
"2.13.12",
"2.12.18",

Binary file not shown.

3
README Normal file
View File

@ -0,0 +1,3 @@
It looks like we may not need files:
random*

View File

@ -1,6 +1,6 @@
package org.mimirdb
import scala.util.Random
import java.io.Serializable
/* Gaussian object NOTE: Scala uses Camel Case; create a new class file for this UDT and its associated methods */
final case class Gauss(mean: Float, sd: Float) {
def generate_value():Float = {
@ -37,5 +37,44 @@ final case class Gauss(mean: Float, sd: Float) {
override def toString(): String = {
s"Gauss(mean: ${mean}, stdiv: ${sd})"
}
/*
Comparison < prototype.
Original idea:
i) Take in a general parameter 'other'
ii) match it to its true type
iii) return the result of the comparison which could be an error
Difficulties:
*/
def <(other: Any): Boolean = other match {
case i: Integer => this < i.asInstanceOf[Float]
case f: Float => f match {
case y if (this.mean + (this.sd * 3) < y) => true //assumption here that 0.3% of the probability space will not matter
case _ => false
}
case d: Double => this < d.asInstanceOf[Float]
case g: Gauss => g match {
case y if (y.mean >= this.mean && y.sd >= this.sd) => false
case n if (n.mean < this.mean && n.sd < this.sd) => true
case _ => ???//here is where we make an estimate and give a confidence level
}
case _ => ???// this should raise an exception of incompatible types; how to do this in spark/scala ecosystem?
}
def >=(other: Any): Boolean = !(this < other)
def >(other: Any): Boolean = other match {
case i: Integer => this > i.asInstanceOf[Float]
case f: Float => f match {
case y if (this.mean + (3 * this.sd) > y) => true
case n if (this.mean + (3 * this.sd) <= n )=> false
case _ => ???//here is where we make an estimate and give a confidence level
}
case d: Double => this > d.asInstanceOf[Float]
case _ => ???//incompatible types exception
}
def <=(other: Any): Boolean = !(this > other)
}//this needs to be serializable
}//this needs to be serializable; is this as simple as object Gauss extends serializable?
object Gauss extends Serializable

View File

@ -0,0 +1,5 @@
package org.mimirdb
trait GaussDistribution {
}

View File

@ -16,6 +16,7 @@ import java.nio.file.{Paths, Files}
/* For SedonaContext */
import org.apache.sedona.spark.SedonaContext
import org.apache.logging.log4j.core.tools.picocli.CommandLine.Help.Column
object Pip
{
@ -187,6 +188,52 @@ object Pip
latLonDist.show(false)
/*
Now, insert uncertainty via Gauss objects.
Let's define the mean to be halfway between the two observations (dist_km, dist_gc2).
Further let the standard deviation be the distance between the mean and an observation.
*/
val create_gauss = udf((mu, sigma) => Gauss(mu, sigma))//this is a function that takes 2 Column objects and returns a Column object
/*
Technically, the math here is incorrect as we need to know which values are greater and lesser.
Nonetheless, good for demonstration purposes.
*/
val uncertGauss = latLonDist.select(latLonDist("*"), create_gauss(latLonDist("dist_km") - latLonDist("dist_gc2") +
latLonDist("dist_gc2"), latLonDist("dist_km") - latLonDist("dist_gc2")) as "dist_uncert")
uncertGauss.show(false)
/* Projection*/
uncertGauss.select(uncertGauss("dist_uncert")).show(false)
/* Selection
NOTE: breaks
*/
/* Class method of column that takes in one or more parameter; if it's not a column, spark is trying to wrap
it in a literal but it doesn't know whether this is a literal (spark class representing constant values).
The easy way is call create_gauss.
*/
/* Try things and let them break where they may. Think about what should happen in a given comparison.
YOu can just define a funcdtion in your case class to code the semantics of the operator, i.e. disjoint domains
of the distributions. One thing that would be helpful: remind yourself how this is handled in other (probabilistic) systems:
maybms, pip, mcdb e.g. meet Oliver @ 1:30pm tomorrow */
/*
the predicate uncertGauss("dist_uncert") < create_gauss(3.0.asInstanceOf[Float], 5.0.asInstanceOf[Float] doesn't work;
I am not sure why, except that is doesn't evaluate as a column
//uncertGauss.filter(uncertGauss("dist_uncert") < create_gauss(3.0.asInstanceOf[Float], 5.0.asInstanceOf[Float])).show(false)
*/
/*
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '(dist_uncert < 1.0D)' due to data type mismatch:
differing types in '(dist_uncert < 1.0D)' (binary and double). Weird...binary?
Note: even after writing an operator < function in Gauss.scala, still the same error. This must be because I have not told
spark where to look for the comparison operator. How do I do that? Do I need to modify Expression/Column.scala?
*/
uncertGauss.filter(uncertGauss("dist_uncert") < 1.0).show(false)
}

View File

@ -0,0 +1,40 @@
package org.mimirdb
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
/* The presumable structure of an input tuple */
case class RowOfData(id: String, speed: Double, interval: Int)
/* Following spark documentation for implementation */
object histogram extends Aggregator[RowOfData, Array[(Double, Int)], Array[Double]] {
//Do I ASSUME that the set of tuples being aggregated share the same point and are thus from the same distribution
//If we use an Array ds, there must be order to the input, while in general input is arbitrary
//PLAN:
//i) reduce just saves the speed with its corresponding interval
//ii) finish sorts, by interval, and saves the speeds in that order
//iii) merge just concatenates lists to be sorted and 'stripped' by the finish routine
/* An intermediary DS to aid with ordering the data in the Array */
var position = Array[Int]()
def zero: Array[(Double, Int)] = Array[(Double, Int)]()
def reduce(buffer: Array[(Double, Int)], dataPoint: RowOfData): Array[(Double, Int)] = {
buffer :+ (dataPoint.speed, dataPoint.interval)
}
def merge(b1: Array[(Double, Int)], b2: Array[(Double, Int)]): Array[(Double, Int)] = b1 ++ b2
def finish(reduction: Array[(Double, Int)]): Array[Double] = {
reduction.sortBy(_._2).map(x => x._1)
}
def bufferEncoder: Encoder[Array[(Double, Int)]] = ExpressionEncoder()
def outputEncoder: Encoder[Array[Double]] = ExpressionEncoder()
}