From 5479e9578cd860783093440c5d17a4810b574140 Mon Sep 17 00:00:00 2001 From: Oliver Date: Sun, 24 Mar 2024 11:04:00 -0400 Subject: [PATCH] ~100x performance boost for bottom-up clustering - Profiling ID'd several points of improvement - Eliminating spurious allocations in tight loops, particularly in the distance measure - Bottom-up algorithm was accidentally cubic --- .../pip/lib/HierarchicalClustering.scala | 96 +++--- lib/src/org/mimirdb/pip/lib/KDTree.scala | 7 + lib/src/org/mimirdb/pip/lib/Time.scala | 8 +- .../pip/lib/HierarchicalClusteringTests.scala | 21 +- .../src/org/mimirdb/pip/lib/TestData.scala | 33 +- src/org/mimirdb/pip/Main.scala | 318 +++++++++--------- src/org/mimirdb/pip/TestData.scala | 78 +++++ 7 files changed, 353 insertions(+), 208 deletions(-) create mode 100644 src/org/mimirdb/pip/TestData.scala diff --git a/lib/src/org/mimirdb/pip/lib/HierarchicalClustering.scala b/lib/src/org/mimirdb/pip/lib/HierarchicalClustering.scala index ca82712..a9e549e 100644 --- a/lib/src/org/mimirdb/pip/lib/HierarchicalClustering.scala +++ b/lib/src/org/mimirdb/pip/lib/HierarchicalClustering.scala @@ -4,6 +4,7 @@ import scala.collection.mutable import scala.reflect.ClassTag import scala.collection.mutable.PriorityQueue import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Stack object HierarchicalClustering { @@ -52,10 +53,8 @@ object HierarchicalClustering } Group( - Array( - naive(left.toIndexedSeq.map { elements(_) }, measure), - naive(right.toIndexedSeq.map { elements(_) }, measure), - ), + left = naive(left.toIndexedSeq.map { elements(_) }, measure), + right = naive(right.toIndexedSeq.map { elements(_) }, measure), radius = furthestDistance, size = elements.size, ) @@ -114,31 +113,24 @@ object HierarchicalClustering val newCluster = Group( - Array[Cluster[I, V]](aCluster, bCluster), + left = aCluster, + right = bCluster, radius = radius, size = aCluster.size + bCluster.size ) - if(queue.isEmpty){ + if(tree.isEmpty){ return newCluster } else { - val distances = - queue.map { _._2 } - .flatMap { i => clusters(i).map { i -> _ } } - .map { case (idx, (pos, _)) => - idx -> measure.pointToPoint(centroid, pos) - } - // the initial check is incomplete... it's possible that all the elements in the - // queue have already been merged. If *this* is empty, then we actually don't - // have more work to do. - if(distances.isEmpty){ return newCluster } - - // If we're here, we have at least one more pair to merge val newIdx = clusters.size clusters.append( Some(centroid, newCluster) ) - val (closestIdx, distance) = distances.maxBy { _._2 } - + + val (nearestPosition, nearestIdx, nearestDistance) = + tree.nearest(centroid, measure).get + + tree.insert(centroid, newIdx) + queue.enqueue( - (distance, newIdx, closestIdx) + (nearestDistance, newIdx, nearestIdx.head) ) } } @@ -171,14 +163,50 @@ object HierarchicalClustering } } + class ClusterElementIterator[I, V](root: Cluster[I, V]) + extends Iterator[Singleton[I, V]] + { + val stack = Stack[Group[I, V]]() + var nextSingleton: Singleton[I, V] = null + pushLeft(root) + + def pushLeft(node: Cluster[I, V]): Unit = + { + var tmp = node; + while(tmp != null){ + tmp = tmp match { + case g: Group[I, V] => + { + stack.push(g) + g.left + } + case s: Singleton[I, V] => + { + nextSingleton = s + null + } + } + } + } + + def hasNext: Boolean = (nextSingleton != null) + def next: Singleton[I, V] = + { + val ret = nextSingleton + if(stack.isEmpty){ nextSingleton = null } + else { pushLeft(stack.pop.right) } + return ret + } + } + trait Cluster[I, V] { def radius: Double def render(firstPrefix: String, restPrefix: String): String def size: Int - def children: Array[Cluster[I, V]] + def children: Seq[Cluster[I, V]] def orderedIterator = new ClusterIterator(this) - def elements: Iterator[Singleton[I, V]] + def elements = new ClusterElementIterator[I, V](this) def threshold(cutoff: Double): Iterator[Cluster[I, V]] = { if(radius > cutoff){ @@ -190,33 +218,21 @@ object HierarchicalClustering override def toString(): String = render("", "") } - case class Group[I, V](children: Array[Cluster[I,V]], radius: Double, size: Int) extends Cluster[I, V] + case class Group[I, V](left: Cluster[I,V], right: Cluster[I, V], radius: Double, size: Int) extends Cluster[I, V] { + def children = Seq(left, right) def render(firstPrefix: String, restPrefix: String): String = - { firstPrefix + s"- [$radius]\n" + - children.zipWithIndex.map { case (child, idx) => - child.render(restPrefix + " +-", - if(idx == children.size - 1){ - restPrefix + " " - } else { - restPrefix + " | " - } - ) - }.mkString("\n") - } - def elements: Iterator[Singleton[I, V]] = - children.iterator.flatMap { _.elements } + left.render(restPrefix + " +-", restPrefix + " | ") + "\n" + + right.render(restPrefix + " +-", restPrefix + " ") } case class Singleton[I, V](position: Array[I], value: V) extends Cluster[I, V] { + def children = Seq() def radius = 0.0 def size = 1 - def children = Array() def render(firstPrefix: String, restPrefix: String): String = firstPrefix + " <" + position.mkString(", ") + s"> -> $value" - def elements: Iterator[Singleton[I, V]] = - Some(this).iterator } diff --git a/lib/src/org/mimirdb/pip/lib/KDTree.scala b/lib/src/org/mimirdb/pip/lib/KDTree.scala index ae68f67..0891d44 100644 --- a/lib/src/org/mimirdb/pip/lib/KDTree.scala +++ b/lib/src/org/mimirdb/pip/lib/KDTree.scala @@ -5,6 +5,7 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I]) { type Key = Array[I] var root: Option[Node] = None + private var _size = 0 def insert(position: Key, value: V): Unit = { @@ -13,6 +14,7 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I]) case Some(node) => node.insert(position, value) case None => Leaf(position, Set(value), 0) }) + _size = _size + 1 } def insertAll(elements: Iterable[(Key, V)]): Unit = @@ -25,6 +27,7 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I]) case Some(node) => node.insertAll(elements.toSeq) case None => fragmentSeq(elements.toSeq, 0) }) + _size = _size + elements.size } def remove(position: Key, value: V): Boolean = @@ -35,11 +38,15 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I]) { val (newRoot, ret) = r.remove(position, value) root = newRoot + _size = _size - 1 return ret } } } + def isEmpty = root.isEmpty + def size = _size + def nearest(position: Key, measure: Distance[I], ignore: (Array[I], Set[V], Double) => Boolean = {(_, _, _) => false}): Option[(Key, Set[V], Double)] = { root.flatMap { _.nearest( diff --git a/lib/src/org/mimirdb/pip/lib/Time.scala b/lib/src/org/mimirdb/pip/lib/Time.scala index 54e5b48..ba863e5 100644 --- a/lib/src/org/mimirdb/pip/lib/Time.scala +++ b/lib/src/org/mimirdb/pip/lib/Time.scala @@ -12,4 +12,10 @@ object Time { def apply[V](label: String)(f: => V): V = apply( t => println(s"[$label] $t s") )(f) -} \ No newline at end of file + + case class Timer(label: String) + { + var tot = 0.0 + def apply[V](f: => V): V = Time.apply( tot += _ ){ f } + } +} diff --git a/lib/test/src/org/mimirdb/pip/lib/HierarchicalClusteringTests.scala b/lib/test/src/org/mimirdb/pip/lib/HierarchicalClusteringTests.scala index 10fe485..b6d9adf 100644 --- a/lib/test/src/org/mimirdb/pip/lib/HierarchicalClusteringTests.scala +++ b/lib/test/src/org/mimirdb/pip/lib/HierarchicalClusteringTests.scala @@ -83,6 +83,12 @@ class HierarchicalClusteringTests extends AnyFlatSpec { } } + it should "run fast" in { + val data = TestData.makeData(500).zipWithIndex + Time("Hierarchical@500"){ + HierarchicalClustering.bottomUp(data, EuclideanDistance) + } + } it should "perform sensibly" in { @@ -94,15 +100,14 @@ class HierarchicalClusteringTests extends AnyFlatSpec { val datasets = Seq( 100, - 200, - 300, - 400, + 250, 500, - 600, - 700, - 800, - 900, 1000, + 2500, + 5000, + 10000, + // 25000, + // 50000 ).map { s => s -> TestData.makeData(s).zipWithIndex } @@ -112,7 +117,7 @@ class HierarchicalClusteringTests extends AnyFlatSpec { log("size, time_s") for( (size, data) <- datasets ) { - val skip = (strategy == "Naive" && size > 400) + val skip = (strategy == "Naive" && size >= 1000) if(!skip){ Time(t => { log(s"$size, $t"); println(s"$strategy @ $size: $t s") }){ makeClusters(data, EuclideanDistance) diff --git a/lib/test/src/org/mimirdb/pip/lib/TestData.scala b/lib/test/src/org/mimirdb/pip/lib/TestData.scala index 1f461ea..d190bc1 100644 --- a/lib/test/src/org/mimirdb/pip/lib/TestData.scala +++ b/lib/test/src/org/mimirdb/pip/lib/TestData.scala @@ -26,25 +26,44 @@ object TestData } def centroid(a: Iterable[Array[Double]]): Array[Double] = { - (0 until DIMENSIONS) - .map { i => a.map { _(i) }.sum / a.size } - .toArray + val ret = Array.ofDim[Double](DIMENSIONS) + for(pt <- a) + { + for(i <- 0 until DIMENSIONS) + { + ret(i) += pt(i) + } + } + for(i <- 0 until DIMENSIONS) + { + ret(i) /= a.size + } + return ret } } object ManhattanDistance extends BaseDistance { def pointToPoint(a: Array[Double], b: Array[Double]): Double = { - a.zip(b).map { case (a, b) => Math.abs(a - b) }.sum + var tot = 0.0 + for(i <- 0 until a.size) + { + tot += Math.abs(a(i) - b(i)) + } + return tot } } object EuclideanDistance extends BaseDistance { def pointToPoint(a: Array[Double], b: Array[Double]): Double = { - Math.sqrt( - a.zip(b).map { case (a, b) => Math.pow(Math.abs(a - b), 2) }.sum - ) + var tot = 0.0 + for(i <- 0 until a.size) + { + val v = Math.abs(a(i) - b(i)) + tot += v * v + } + Math.sqrt(tot) } } diff --git a/src/org/mimirdb/pip/Main.scala b/src/org/mimirdb/pip/Main.scala index a15d0dc..68ded0e 100644 --- a/src/org/mimirdb/pip/Main.scala +++ b/src/org/mimirdb/pip/Main.scala @@ -18,6 +18,9 @@ import org.apache.sedona.spark.SedonaContext import org.apache.logging.log4j.core.tools.picocli.CommandLine.Help.Column import org.mimirdb.pip.distribution._ +import scala.util.Random +import org.mimirdb.pip.lib._ + object Main { @@ -28,167 +31,178 @@ object Main def main(args: Array[String]): Unit = { - val spark = SparkSession.builder - .appName("pip") - .master("local[*]") - .getOrCreate() + val size = + args.headOption.map { _.toInt }.getOrElse { 4000 } - spark.sparkContext.setLogLevel("WARN") - spark.sparkContext.setCheckpointDir("spark-warehouse") + println(s"Size: $size") - Pip.init(spark) - - /* - Reproducing Vizier mars_rover workflow - NOTE: - this will probably be migrated into its own files later down the road - */ - - import org.apache.spark.sql.DataFrameReader - /* It appears hadoop doesn't like urls, so need to - i) download the file locally - ii) then read it in - - */ - val webData = "https://mars.nasa.gov/mmgis-maps/M20/Layers/json/M20_waypoints.json" - val fileData: String = "marsRoverData.json" - - /* We don't need to download if we already have the file. - Source: http://stackoverflow.com/questions/21177107/ddg#21178667 - */ - if (!Files.exists(Paths.get(fileData))) { - println("We didn't find the file. Now downloading...") - fileDownload(webData, fileData) + val data = TestData.makeData(size).zipWithIndex + Time(s"Hierarchical@$size"){ + HierarchicalClustering.bottomUp(data, TestData.EuclideanDistance) } - - println("We did find the file, now reading.") - assert(SedonaContext.create(spark) eq spark) - var df = spark.read.option("multiLine", true).json(fileData) - /* Create temporary Spark view to query */ - df.createOrReplaceTempView("trips") - - //////////////////////////////////////////////////////// - // Extract GeoJSON and properties field from the data - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT features.type, - features.properties.*, - ST_GeomFromGeoJSON(to_json(features.geometry)) as geo - FROM ( - SELECT explode(features) AS features FROM trips - ) - """).coalesce(1) - // sqlDF.printSchema() - // df.show(false) - df.createOrReplaceTempView("traverse_data") - //////////////////////////////////////////////////////// - // Trip Times - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT *, - dist_km - lag(dist_km, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS km_traveled, - sol - lag(sol, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS sols_traveled - FROM traverse_data - WHERE dist_km > 0 - """) - // df.show(false) - df.createOrReplaceTempView("traverse_data") - // spark.sql(""" - // SELECT max(km_traveled * 1000 / sols_traveled) as m_per_sol + // val spark = SparkSession.builder + // .appName("pip") + // .master("local[*]") + // .getOrCreate() + + // spark.sparkContext.setLogLevel("WARN") + // spark.sparkContext.setCheckpointDir("spark-warehouse") + + // Pip.init(spark) + + // /* + // Reproducing Vizier mars_rover workflow + // NOTE: + // this will probably be migrated into its own files later down the road + // */ + + // import org.apache.spark.sql.DataFrameReader + // /* It appears hadoop doesn't like urls, so need to + // i) download the file locally + // ii) then read it in + + // */ + // val webData = "https://mars.nasa.gov/mmgis-maps/M20/Layers/json/M20_waypoints.json" + // val fileData: String = "marsRoverData.json" + + // /* We don't need to download if we already have the file. + // Source: http://stackoverflow.com/questions/21177107/ddg#21178667 + // */ + // if (!Files.exists(Paths.get(fileData))) { + // println("We didn't find the file. Now downloading...") + // fileDownload(webData, fileData) + // } + + // println("We did find the file, now reading.") + // assert(SedonaContext.create(spark) eq spark) + // var df = spark.read.option("multiLine", true).json(fileData) + + // /* Create temporary Spark view to query */ + // df.createOrReplaceTempView("trips") + + // //////////////////////////////////////////////////////// + // // Extract GeoJSON and properties field from the data + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT features.type, + // features.properties.*, + // ST_GeomFromGeoJSON(to_json(features.geometry)) as geo + // FROM ( + // SELECT explode(features) AS features FROM trips + // ) + // """).coalesce(1) + // // sqlDF.printSchema() + // // df.show(false) + // df.createOrReplaceTempView("traverse_data") + + // //////////////////////////////////////////////////////// + // // Trip Times + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT *, + // dist_km - lag(dist_km, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS km_traveled, + // sol - lag(sol, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS sols_traveled + // FROM traverse_data + // WHERE dist_km > 0 + // """) + // // df.show(false) + // df.createOrReplaceTempView("traverse_data") + // // spark.sql(""" + // // SELECT max(km_traveled * 1000 / sols_traveled) as m_per_sol + // // FROM traverse_data + // // WHERE sols_traveled > 0 + // // """).show() + // // return + + // //////////////////////////////////////////////////////// + // // Trip Distances + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT ST_Point( + // CAST(lon as decimal(24,20)), + // CAST(lat as decimal(24,20)) + // ) as geometry, + // clamp(gaussian(cast(km_traveled as double) * 1000 / sols_traveled, 3.0), 0.0, 800) as m_per_sol // FROM traverse_data // WHERE sols_traveled > 0 - // """).show() - // return + // """)//.checkpoint() + // // tripDist.printSchema() + // // df.show(false) + // df.createOrReplaceTempView("trip_points") - //////////////////////////////////////////////////////// - // Trip Distances - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT ST_Point( - CAST(lon as decimal(24,20)), - CAST(lat as decimal(24,20)) - ) as geometry, - clamp(gaussian(cast(km_traveled as double) * 1000 / sols_traveled, 3.0), 0.0, 800) as m_per_sol - FROM traverse_data - WHERE sols_traveled > 0 - """)//.checkpoint() - // tripDist.printSchema() + + // //////////////////////////////////////////////////////// + // // Bounding Box + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT min(lat) as min_lat, + // max(lat) as max_lat, + // min(lon) as min_lon, + // max(lon) as max_lon + // FROM traverse_data + // """) + // // df.show(false) + // df.createOrReplaceTempView("mission_region") + + // //////////////////////////////////////////////////////// + // // Example Histogram Regions + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT id, ST_PolygonFromEnvelope(lon_low, lat_low, lon_high, lat_high) as geometry + // FROM ( + // SELECT + // 10 * lon_idx + lat_idx AS id, + // (max_lat - min_lat)/10 * lat_idx + min_lat AS lat_low, + // (max_lat - min_lat)/10 * (lat_idx+1) + min_lat AS lat_high, + // (max_lon - min_lon)/10 * lon_idx + min_lon AS lon_low, + // (max_lon - min_lon)/10 * (lon_idx+1) + min_lon AS lon_high + // FROM (SELECT id AS lon_idx from range(0,10)) AS lon_split, + // (SELECT id AS lat_idx from range(0,10)) AS lat_split, + // mission_region + // ) + // """) + // // df.show(false) + // df.createOrReplaceTempView("bounding_boxes") + + // //////////////////////////////////////////////////////// + // // Per-region distributions + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT box.id, + // array_agg(m_per_sol) as components, + // discretize( + // uniform_mixture(m_per_sol), + // array(0.0, 40.0, 80.0, 120.0, 160.0, 200.0, 240.0, 280.0, 320.0, 360.0, 400.0), + // 1000 + // ) as m_per_sol + // FROM trip_points point, + // bounding_boxes box + // WHERE ST_Contains(box.geometry, point.geometry) + // GROUP BY box.id + // """) + // // df.show(false) + // df.createOrReplaceTempView("grid_squares") + + // //////////////////////////////////////////////////////// + // // Per-region metrics + // //////////////////////////////////////////////////////// + // df = spark.sql(""" + // SELECT id, + // m_per_sol, + // entropy(m_per_sol) as entropy, + // array_max( + // transform( + // components, + // x -> kl_divergence(x, m_per_sol) + // ) + // ) as max_kl_div + // -- components + // FROM grid_squares + // -- LIMIT 1 + // """) // df.show(false) - df.createOrReplaceTempView("trip_points") - - - //////////////////////////////////////////////////////// - // Bounding Box - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT min(lat) as min_lat, - max(lat) as max_lat, - min(lon) as min_lon, - max(lon) as max_lon - FROM traverse_data - """) - // df.show(false) - df.createOrReplaceTempView("mission_region") - - //////////////////////////////////////////////////////// - // Example Histogram Regions - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT id, ST_PolygonFromEnvelope(lon_low, lat_low, lon_high, lat_high) as geometry - FROM ( - SELECT - 10 * lon_idx + lat_idx AS id, - (max_lat - min_lat)/10 * lat_idx + min_lat AS lat_low, - (max_lat - min_lat)/10 * (lat_idx+1) + min_lat AS lat_high, - (max_lon - min_lon)/10 * lon_idx + min_lon AS lon_low, - (max_lon - min_lon)/10 * (lon_idx+1) + min_lon AS lon_high - FROM (SELECT id AS lon_idx from range(0,10)) AS lon_split, - (SELECT id AS lat_idx from range(0,10)) AS lat_split, - mission_region - ) - """) - // df.show(false) - df.createOrReplaceTempView("bounding_boxes") - - //////////////////////////////////////////////////////// - // Per-region distributions - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT box.id, - array_agg(m_per_sol) as components, - discretize( - uniform_mixture(m_per_sol), - array(0.0, 40.0, 80.0, 120.0, 160.0, 200.0, 240.0, 280.0, 320.0, 360.0, 400.0), - 1000 - ) as m_per_sol - FROM trip_points point, - bounding_boxes box - WHERE ST_Contains(box.geometry, point.geometry) - GROUP BY box.id - """) - // df.show(false) - df.createOrReplaceTempView("grid_squares") - - //////////////////////////////////////////////////////// - // Per-region metrics - //////////////////////////////////////////////////////// - df = spark.sql(""" - SELECT id, - m_per_sol, - entropy(m_per_sol) as entropy, - array_max( - transform( - components, - x -> kl_divergence(x, m_per_sol) - ) - ) as max_kl_div - -- components - FROM grid_squares - -- LIMIT 1 - """) - df.show(false) } } diff --git a/src/org/mimirdb/pip/TestData.scala b/src/org/mimirdb/pip/TestData.scala new file mode 100644 index 0000000..3ac67c2 --- /dev/null +++ b/src/org/mimirdb/pip/TestData.scala @@ -0,0 +1,78 @@ +package org.mimirdb.pip.lib +import org.mimirdb.pip.distribution.Discretized + +import scala.util.Random + +object TestData +{ + + val DIMENSIONS = 10 + val BIN_SIZE = 1.0 / DIMENSIONS + + def positionToBins(position: Array[Double]): Seq[Discretized.Bin] = + { + position.zipWithIndex.map { case (b, i) => + Discretized.Bin(i * BIN_SIZE, (i+1) * BIN_SIZE, b) + }.toSeq + } + + trait BaseDistance extends Distance[Double] + { + val min = 0.0 + val max = 1.0 + def pointToPlane(a: Array[Double], b: Double, dim: Int): Double = + { + Math.abs(a(dim) - b) + } + def centroid(a: Iterable[Array[Double]]): Array[Double] = + { + val ret = Array.ofDim[Double](DIMENSIONS) + for(pt <- a) + { + for(i <- 0 until DIMENSIONS) + { + ret(i) += pt(i) + } + } + for(i <- 0 until DIMENSIONS) + { + ret(i) /= a.size + } + return ret + } + } + object ManhattanDistance extends BaseDistance + { + def pointToPoint(a: Array[Double], b: Array[Double]): Double = + { + var tot = 0.0 + for(i <- 0 until a.size) + { + tot += Math.abs(a(i) - b(i)) + } + return tot + } + } + object EuclideanDistance extends BaseDistance + { + def pointToPoint(a: Array[Double], b: Array[Double]): Double = + { + var tot = 0.0 + for(i <- 0 until a.size) + { + val v = Math.abs(a(i) - b(i)) + tot += v * v + } + Math.sqrt(tot) + } + } + + def makeData(size: Int): IndexedSeq[Array[Double]] = + { + (0 until size).map { _ => + (0 until DIMENSIONS).map { _ => + Random.nextDouble + }.toArray + }.toArray.toIndexedSeq + } +} \ No newline at end of file