From 443e34651d3e577a4cd39f6591b17c7af3670daa Mon Sep 17 00:00:00 2001 From: Oliver Date: Fri, 19 Apr 2024 00:03:07 -0400 Subject: [PATCH] WIP: Adding support for boolean ops --- lib/src/org/mimirdb/pip/Pip.scala | 12 ++-- .../pip/distribution/boolean/Bernoulli.scala | 35 ++++++++++ .../pip/distribution/boolean/Between.scala | 70 +++++++++++++++++++ .../pip/distribution/boolean/package.scala | 38 ++++++++++ .../pip/distribution/distributionFamily.scala | 61 ++-------------- .../distribution/{ => numerical}/Clamp.scala | 3 +- .../{ => numerical}/ConstantNumber.scala | 2 +- .../{ => numerical}/Discretized.scala | 2 +- .../{ => numerical}/Gaussian.scala | 2 +- .../{ => numerical}/NumericalMixture.scala | 3 +- .../{ => numerical}/Uniform.scala | 2 +- .../pip/distribution/numerical/package.scala | 55 +++++++++++++++ lib/src/org/mimirdb/pip/udf/Entropy.scala | 4 +- .../org/mimirdb/pip/udf/KLDivergence.scala | 4 +- lib/src/org/mimirdb/pip/udf/Probability.scala | 0 src/org/mimirdb/pip/TestData.scala | 2 +- 16 files changed, 222 insertions(+), 73 deletions(-) create mode 100644 lib/src/org/mimirdb/pip/distribution/boolean/Bernoulli.scala create mode 100644 lib/src/org/mimirdb/pip/distribution/boolean/Between.scala create mode 100644 lib/src/org/mimirdb/pip/distribution/boolean/package.scala rename lib/src/org/mimirdb/pip/distribution/{ => numerical}/Clamp.scala (97%) rename lib/src/org/mimirdb/pip/distribution/{ => numerical}/ConstantNumber.scala (97%) rename lib/src/org/mimirdb/pip/distribution/{ => numerical}/Discretized.scala (99%) rename lib/src/org/mimirdb/pip/distribution/{ => numerical}/Gaussian.scala (99%) rename lib/src/org/mimirdb/pip/distribution/{ => numerical}/NumericalMixture.scala (96%) rename lib/src/org/mimirdb/pip/distribution/{ => numerical}/Uniform.scala (97%) create mode 100644 lib/src/org/mimirdb/pip/distribution/numerical/package.scala create mode 100644 lib/src/org/mimirdb/pip/udf/Probability.scala diff --git a/lib/src/org/mimirdb/pip/Pip.scala b/lib/src/org/mimirdb/pip/Pip.scala index 84374c0..fad145f 100644 --- a/lib/src/org/mimirdb/pip/Pip.scala +++ b/lib/src/org/mimirdb/pip/Pip.scala @@ -28,16 +28,16 @@ object Pip { .functionRegistry .createOrReplaceTempFunction(name, fn, "scala_udf") - registerFunction("gaussian", distribution.Gaussian.Constructor(_)) - registerFunction("uniform", distribution.Uniform.Constructor(_)) - registerFunction("num_const", distribution.ConstantNumber.Constructor(_)) - registerFunction("clamp", distribution.Clamp.Constructor) - registerFunction("discretize", distribution.Discretized.Constructor) + registerFunction("gaussian", distribution.numerical.Gaussian.Constructor(_)) + registerFunction("uniform", distribution.numerical.Uniform.Constructor(_)) + registerFunction("num_const", distribution.numerical.ConstantNumber.Constructor(_)) + registerFunction("clamp", distribution.numerical.Clamp.Constructor) + registerFunction("discretize", distribution.numerical.Discretized.Constructor) spark.udf.register("entropy", udf.Entropy.udf) spark.udf.register("kl_divergence", udf.KLDivergence.udf) // Aggregates - spark.udf.register("uniform_mixture", distribution.NumericalMixture.uniform) + spark.udf.register("uniform_mixture", distribution.numerical.NumericalMixture.uniform) spark.udf.register("histogram", udaf(udf.Histogram)) } diff --git a/lib/src/org/mimirdb/pip/distribution/boolean/Bernoulli.scala b/lib/src/org/mimirdb/pip/distribution/boolean/Bernoulli.scala new file mode 100644 index 0000000..582378b --- /dev/null +++ b/lib/src/org/mimirdb/pip/distribution/boolean/Bernoulli.scala @@ -0,0 +1,35 @@ +package org.mimirdb.pip.distribution.boolean + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.mimirdb.pip.udt.UnivariateDistributionConstructor + +object Bernoulli + extends BooleanDistributionFamily + with ProbabilitySupported +{ + + def probability(params: Any): Double = + params.asInstanceOf[Double] + + def describe(params: Any): String = + s"Bernoulli($params)" + + def sample(params: Any, random: scala.util.Random): Boolean = + random.nextDouble() < params.asInstanceOf[Double] + + def deserialize(in: java.io.ObjectInputStream): Any = + in.readDouble() + + def serialize(out: java.io.ObjectOutputStream, params: Any): Unit = + out.writeDouble(params.asInstanceOf[Double]) + + case class Constructor(args: Seq[Expression]) + extends UnivariateDistributionConstructor + { + def family = Bernoulli + def params(values: Seq[Any]) = values(0).asInstanceOf[Double] + + def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = + copy(args = newChildren) + } +} \ No newline at end of file diff --git a/lib/src/org/mimirdb/pip/distribution/boolean/Between.scala b/lib/src/org/mimirdb/pip/distribution/boolean/Between.scala new file mode 100644 index 0000000..3e7ca07 --- /dev/null +++ b/lib/src/org/mimirdb/pip/distribution/boolean/Between.scala @@ -0,0 +1,70 @@ +package org.mimirdb.pip.distribution.boolean + +import org.mimirdb.pip.distribution.DistributionFamily +import org.mimirdb.pip.distribution.numerical.NumericalDistributionFamily +import org.mimirdb.pip.distribution.numerical.CDFSupported + +object Between + extends BooleanDistributionFamily +{ + case class Params(lower: Double, upper: Double, baseDist: String, baseParams: Any) + { + def dist = + DistributionFamily(baseDist).asInstanceOf[NumericalDistributionFamily] + def apply[A](op: (NumericalDistributionFamily, Any) => A): A = + { + op(dist, baseParams) + } + } + + override def approximateProbability(params: Any, samples: Int): Double = + { + val config = params.asInstanceOf[Params] + + config.dist match { + case dist: CDFSupported => + dist.cdf(config.upper, config.baseParams) + - dist.cdf(config.lower, config.baseParams) + case dist => super.approximateProbability(params, samples) + } + } + override def approximateProbabilityIsFast(params: Any): Boolean = + params.asInstanceOf[Params].dist.isInstanceOf[CDFSupported] + + def describe(params: Any): String = + { + val config = params.asInstanceOf[Params] + s"Between(${config.lower} < ${config { _.describe(_) }} < ${config.upper})" + } + + def sample(params: Any, random: scala.util.Random): Boolean = + { + val config = params.asInstanceOf[Params] + val v:Double = config { _.sample(_, random).asInstanceOf[Double] } + + return (v > config.lower) && (v < config.upper) + } + + def deserialize(in: java.io.ObjectInputStream): Any = + { + val lower = in.readDouble() + val upper = in.readDouble() + val baseDist = in.readUTF() + val dist = DistributionFamily(baseDist) + Params( + lower = lower, + upper = upper, + baseDist = baseDist, + baseParams = dist.deserialize(in) + ) + } + + def serialize(out: java.io.ObjectOutputStream, params: Any): Unit = + { + val config = params.asInstanceOf[Params] + out.writeDouble(config.lower) + out.writeDouble(config.upper) + out.writeUTF(config.baseDist) + config { _.serialize(out, _) } + } +} \ No newline at end of file diff --git a/lib/src/org/mimirdb/pip/distribution/boolean/package.scala b/lib/src/org/mimirdb/pip/distribution/boolean/package.scala new file mode 100644 index 0000000..9f02ddd --- /dev/null +++ b/lib/src/org/mimirdb/pip/distribution/boolean/package.scala @@ -0,0 +1,38 @@ +package org.mimirdb.pip.distribution.boolean + +import org.apache.spark.sql.types.{ DataType, BooleanType } +import org.mimirdb.pip.distribution.DistributionFamily + +/** + * A [Distribution] that specifically samples numbers + */ +trait BooleanDistributionFamily extends DistributionFamily +{ + val baseType = BooleanType + + def approximateProbability(params: Any, samples: Int): Double = + this match { + case c:ProbabilitySupported => c.probability(params) + case _ => + { + val rand = new scala.util.Random() + (0 until samples).count { _ => + sample(params, rand).asInstanceOf[Boolean] + }.toDouble / samples + } + } + + def approximateProbabilityIsFast(params: Any): Boolean = this.isInstanceOf[ProbabilitySupported] +} + +/** + * An add-on to NumericalDistributionFamily that indicates an exact CDF can be computed + */ +trait ProbabilitySupported +{ + val baseType: DataType + + assert(baseType == BooleanType, "Non-boolean distributions can not support probabilities") + + def probability(params: Any): Double +} \ No newline at end of file diff --git a/lib/src/org/mimirdb/pip/distribution/distributionFamily.scala b/lib/src/org/mimirdb/pip/distribution/distributionFamily.scala index 6653c1c..c65533d 100644 --- a/lib/src/org/mimirdb/pip/distribution/distributionFamily.scala +++ b/lib/src/org/mimirdb/pip/distribution/distributionFamily.scala @@ -57,57 +57,6 @@ trait DistributionFamily def label = this.getClass.getSimpleName.toLowerCase } -/** - * A [Distribution] that specifically samples numbers - */ -trait NumericalDistributionFamily extends DistributionFamily -{ - val baseType = DoubleType - - /** - * Compute the CDF - */ - def approximateCDF(value: Double, params: Any, samples: Int): Double = - this match { - case c:CDFSupported => c.cdf(value, params) - case _ => - { - val rand = new scala.util.Random() - (0 until samples).count { _ => - sample(params, rand).asInstanceOf[Double] <= value - }.toDouble / samples - } - } - def approximateCDFIsFast(params: Any): Boolean = this.isInstanceOf[CDFSupported] - - def min(params: Any): Double - def max(params: Any): Double -} - -/** - * An add-on to NumericalDistributionFamily that indicates an exact CDF can be computed - */ -trait CDFSupported -{ - val baseType: DataType - - assert(baseType == DoubleType, "Non-numerical distributions can not support CDFs") - - def cdf(value: Double, params: Any): Double -} - -/** - * An add-on to NumericalDistributionFamily that indicates an exact Inverse CDF can be computed - */ -trait ICDFSupported -{ - val baseType: DataType - - assert(baseType == DoubleType, "Non-numerical distributions can not support ICDFs") - - def icdf(value: Double, params: Any): Double -} - /** * Companion object for distributions: Keeps a registry of all known distributions */ @@ -128,9 +77,9 @@ object DistributionFamily /// Pre-defined distributions - register(Gaussian) - register(NumericalMixture) - register(Clamp) - register(Discretized) - register(Uniform) + register(numerical.Gaussian) + register(numerical.NumericalMixture) + register(numerical.Clamp) + register(numerical.Discretized) + register(numerical.Uniform) } \ No newline at end of file diff --git a/lib/src/org/mimirdb/pip/distribution/Clamp.scala b/lib/src/org/mimirdb/pip/distribution/numerical/Clamp.scala similarity index 97% rename from lib/src/org/mimirdb/pip/distribution/Clamp.scala rename to lib/src/org/mimirdb/pip/distribution/numerical/Clamp.scala index 3c53911..bd9df94 100644 --- a/lib/src/org/mimirdb/pip/distribution/Clamp.scala +++ b/lib/src/org/mimirdb/pip/distribution/numerical/Clamp.scala @@ -1,4 +1,4 @@ -package org.mimirdb.pip.distribution +package org.mimirdb.pip.distribution.numerical import scala.util.Random import java.io.ObjectOutputStream @@ -7,6 +7,7 @@ import org.mimirdb.pip.udt.UnivariateDistribution import org.apache.spark.sql.functions import org.mimirdb.pip.udt.UnivariateDistributionConstructor import org.apache.spark.sql.catalyst.expressions.Expression +import org.mimirdb.pip.distribution.DistributionFamily object Clamp extends NumericalDistributionFamily diff --git a/lib/src/org/mimirdb/pip/distribution/ConstantNumber.scala b/lib/src/org/mimirdb/pip/distribution/numerical/ConstantNumber.scala similarity index 97% rename from lib/src/org/mimirdb/pip/distribution/ConstantNumber.scala rename to lib/src/org/mimirdb/pip/distribution/numerical/ConstantNumber.scala index 85cf063..2f98ab5 100644 --- a/lib/src/org/mimirdb/pip/distribution/ConstantNumber.scala +++ b/lib/src/org/mimirdb/pip/distribution/numerical/ConstantNumber.scala @@ -1,4 +1,4 @@ -package org.mimirdb.pip.distribution +package org.mimirdb.pip.distribution.numerical import scala.util.Random import java.io.Serializable diff --git a/lib/src/org/mimirdb/pip/distribution/Discretized.scala b/lib/src/org/mimirdb/pip/distribution/numerical/Discretized.scala similarity index 99% rename from lib/src/org/mimirdb/pip/distribution/Discretized.scala rename to lib/src/org/mimirdb/pip/distribution/numerical/Discretized.scala index 15c841b..2a243d9 100644 --- a/lib/src/org/mimirdb/pip/distribution/Discretized.scala +++ b/lib/src/org/mimirdb/pip/distribution/numerical/Discretized.scala @@ -1,4 +1,4 @@ -package org.mimirdb.pip.distribution +package org.mimirdb.pip.distribution.numerical import scala.util.Random import java.io.ObjectOutputStream diff --git a/lib/src/org/mimirdb/pip/distribution/Gaussian.scala b/lib/src/org/mimirdb/pip/distribution/numerical/Gaussian.scala similarity index 99% rename from lib/src/org/mimirdb/pip/distribution/Gaussian.scala rename to lib/src/org/mimirdb/pip/distribution/numerical/Gaussian.scala index 3735d70..69d9986 100644 --- a/lib/src/org/mimirdb/pip/distribution/Gaussian.scala +++ b/lib/src/org/mimirdb/pip/distribution/numerical/Gaussian.scala @@ -1,4 +1,4 @@ -package org.mimirdb.pip.distribution +package org.mimirdb.pip.distribution.numerical import scala.util.Random import java.io.Serializable diff --git a/lib/src/org/mimirdb/pip/distribution/NumericalMixture.scala b/lib/src/org/mimirdb/pip/distribution/numerical/NumericalMixture.scala similarity index 96% rename from lib/src/org/mimirdb/pip/distribution/NumericalMixture.scala rename to lib/src/org/mimirdb/pip/distribution/numerical/NumericalMixture.scala index 4f0a544..ec5ed92 100644 --- a/lib/src/org/mimirdb/pip/distribution/NumericalMixture.scala +++ b/lib/src/org/mimirdb/pip/distribution/numerical/NumericalMixture.scala @@ -1,4 +1,4 @@ -package org.mimirdb.pip.distribution +package org.mimirdb.pip.distribution.numerical import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -7,6 +7,7 @@ import org.apache.spark.sql.functions.udaf import org.mimirdb.pip.udt.UnivariateDistribution import java.util.UUID import org.mimirdb.pip.SampleParams +import org.mimirdb.pip.distribution.DistributionFamily object NumericalMixture extends NumericalDistributionFamily diff --git a/lib/src/org/mimirdb/pip/distribution/Uniform.scala b/lib/src/org/mimirdb/pip/distribution/numerical/Uniform.scala similarity index 97% rename from lib/src/org/mimirdb/pip/distribution/Uniform.scala rename to lib/src/org/mimirdb/pip/distribution/numerical/Uniform.scala index 51feb7f..8fe28e3 100644 --- a/lib/src/org/mimirdb/pip/distribution/Uniform.scala +++ b/lib/src/org/mimirdb/pip/distribution/numerical/Uniform.scala @@ -1,4 +1,4 @@ -package org.mimirdb.pip.distribution +package org.mimirdb.pip.distribution.numerical import scala.util.Random import java.io.Serializable diff --git a/lib/src/org/mimirdb/pip/distribution/numerical/package.scala b/lib/src/org/mimirdb/pip/distribution/numerical/package.scala new file mode 100644 index 0000000..40cfe5d --- /dev/null +++ b/lib/src/org/mimirdb/pip/distribution/numerical/package.scala @@ -0,0 +1,55 @@ +package org.mimirdb.pip.distribution.numerical + +import org.apache.spark.sql.types.{ DataType, DoubleType } +import org.mimirdb.pip.distribution.DistributionFamily + +/** + * A [Distribution] that specifically samples numbers + */ +trait NumericalDistributionFamily extends DistributionFamily +{ + val baseType = DoubleType + + /** + * Compute the CDF + */ + def approximateCDF(value: Double, params: Any, samples: Int): Double = + this match { + case c:CDFSupported => c.cdf(value, params) + case _ => + { + val rand = new scala.util.Random() + (0 until samples).count { _ => + sample(params, rand).asInstanceOf[Double] <= value + }.toDouble / samples + } + } + def approximateCDFIsFast(params: Any): Boolean = this.isInstanceOf[CDFSupported] + + def min(params: Any): Double + def max(params: Any): Double +} + +/** + * An add-on to NumericalDistributionFamily that indicates an exact CDF can be computed + */ +trait CDFSupported +{ + val baseType: DataType + + assert(baseType == DoubleType, "Non-numerical distributions can not support CDFs") + + def cdf(value: Double, params: Any): Double +} + +/** + * An add-on to NumericalDistributionFamily that indicates an exact Inverse CDF can be computed + */ +trait ICDFSupported +{ + val baseType: DataType + + assert(baseType == DoubleType, "Non-numerical distributions can not support ICDFs") + + def icdf(value: Double, params: Any): Double +} diff --git a/lib/src/org/mimirdb/pip/udf/Entropy.scala b/lib/src/org/mimirdb/pip/udf/Entropy.scala index 3c2433b..0d6a135 100644 --- a/lib/src/org/mimirdb/pip/udf/Entropy.scala +++ b/lib/src/org/mimirdb/pip/udf/Entropy.scala @@ -1,8 +1,8 @@ package org.mimirdb.pip.udf import org.mimirdb.pip.udt.UnivariateDistribution -import org.mimirdb.pip.distribution.Discretized -import org.mimirdb.pip.distribution.NumericalDistributionFamily +import org.mimirdb.pip.distribution.numerical.Discretized +import org.mimirdb.pip.distribution.numerical.NumericalDistributionFamily import org.apache.spark.sql.functions object Entropy diff --git a/lib/src/org/mimirdb/pip/udf/KLDivergence.scala b/lib/src/org/mimirdb/pip/udf/KLDivergence.scala index d797f74..d4d670e 100644 --- a/lib/src/org/mimirdb/pip/udf/KLDivergence.scala +++ b/lib/src/org/mimirdb/pip/udf/KLDivergence.scala @@ -1,8 +1,8 @@ package org.mimirdb.pip.udf import org.mimirdb.pip.udt.UnivariateDistribution -import org.mimirdb.pip.distribution.Discretized -import org.mimirdb.pip.distribution.NumericalDistributionFamily +import org.mimirdb.pip.distribution.numerical.Discretized +import org.mimirdb.pip.distribution.numerical.NumericalDistributionFamily import org.apache.spark.sql.functions object KLDivergence diff --git a/lib/src/org/mimirdb/pip/udf/Probability.scala b/lib/src/org/mimirdb/pip/udf/Probability.scala new file mode 100644 index 0000000..e69de29 diff --git a/src/org/mimirdb/pip/TestData.scala b/src/org/mimirdb/pip/TestData.scala index 3ac67c2..eda8e65 100644 --- a/src/org/mimirdb/pip/TestData.scala +++ b/src/org/mimirdb/pip/TestData.scala @@ -1,5 +1,5 @@ package org.mimirdb.pip.lib -import org.mimirdb.pip.distribution.Discretized +import org.mimirdb.pip.distribution.numerical.Discretized import scala.util.Random