~100x performance boost for bottom-up clustering

- Profiling ID'd several points of improvement
  - Eliminating spurious allocations in tight loops, particularly in the distance measure
  - Bottom-up algorithm was accidentally cubic
spatial-index
Oliver Kennedy 2024-03-24 11:04:00 -04:00
parent a81aab3a68
commit 5479e9578c
Signed by: okennedy
GPG Key ID: 3E5F9B3ABD3FDB60
7 changed files with 353 additions and 208 deletions

View File

@ -4,6 +4,7 @@ import scala.collection.mutable
import scala.reflect.ClassTag
import scala.collection.mutable.PriorityQueue
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Stack
object HierarchicalClustering
{
@ -52,10 +53,8 @@ object HierarchicalClustering
}
Group(
Array(
naive(left.toIndexedSeq.map { elements(_) }, measure),
naive(right.toIndexedSeq.map { elements(_) }, measure),
),
left = naive(left.toIndexedSeq.map { elements(_) }, measure),
right = naive(right.toIndexedSeq.map { elements(_) }, measure),
radius = furthestDistance,
size = elements.size,
)
@ -114,31 +113,24 @@ object HierarchicalClustering
val newCluster =
Group(
Array[Cluster[I, V]](aCluster, bCluster),
left = aCluster,
right = bCluster,
radius = radius,
size = aCluster.size + bCluster.size
)
if(queue.isEmpty){
if(tree.isEmpty){
return newCluster
} else {
val distances =
queue.map { _._2 }
.flatMap { i => clusters(i).map { i -> _ } }
.map { case (idx, (pos, _)) =>
idx -> measure.pointToPoint(centroid, pos)
}
// the initial check is incomplete... it's possible that all the elements in the
// queue have already been merged. If *this* is empty, then we actually don't
// have more work to do.
if(distances.isEmpty){ return newCluster }
// If we're here, we have at least one more pair to merge
val newIdx = clusters.size
clusters.append( Some(centroid, newCluster) )
val (closestIdx, distance) = distances.maxBy { _._2 }
val (nearestPosition, nearestIdx, nearestDistance) =
tree.nearest(centroid, measure).get
tree.insert(centroid, newIdx)
queue.enqueue(
(distance, newIdx, closestIdx)
(nearestDistance, newIdx, nearestIdx.head)
)
}
}
@ -171,14 +163,50 @@ object HierarchicalClustering
}
}
class ClusterElementIterator[I, V](root: Cluster[I, V])
extends Iterator[Singleton[I, V]]
{
val stack = Stack[Group[I, V]]()
var nextSingleton: Singleton[I, V] = null
pushLeft(root)
def pushLeft(node: Cluster[I, V]): Unit =
{
var tmp = node;
while(tmp != null){
tmp = tmp match {
case g: Group[I, V] =>
{
stack.push(g)
g.left
}
case s: Singleton[I, V] =>
{
nextSingleton = s
null
}
}
}
}
def hasNext: Boolean = (nextSingleton != null)
def next: Singleton[I, V] =
{
val ret = nextSingleton
if(stack.isEmpty){ nextSingleton = null }
else { pushLeft(stack.pop.right) }
return ret
}
}
trait Cluster[I, V]
{
def radius: Double
def render(firstPrefix: String, restPrefix: String): String
def size: Int
def children: Array[Cluster[I, V]]
def children: Seq[Cluster[I, V]]
def orderedIterator = new ClusterIterator(this)
def elements: Iterator[Singleton[I, V]]
def elements = new ClusterElementIterator[I, V](this)
def threshold(cutoff: Double): Iterator[Cluster[I, V]] =
{
if(radius > cutoff){
@ -190,33 +218,21 @@ object HierarchicalClustering
override def toString(): String = render("", "")
}
case class Group[I, V](children: Array[Cluster[I,V]], radius: Double, size: Int) extends Cluster[I, V]
case class Group[I, V](left: Cluster[I,V], right: Cluster[I, V], radius: Double, size: Int) extends Cluster[I, V]
{
def children = Seq(left, right)
def render(firstPrefix: String, restPrefix: String): String =
{
firstPrefix + s"- [$radius]\n" +
children.zipWithIndex.map { case (child, idx) =>
child.render(restPrefix + " +-",
if(idx == children.size - 1){
restPrefix + " "
} else {
restPrefix + " | "
}
)
}.mkString("\n")
}
def elements: Iterator[Singleton[I, V]] =
children.iterator.flatMap { _.elements }
left.render(restPrefix + " +-", restPrefix + " | ") + "\n" +
right.render(restPrefix + " +-", restPrefix + " ")
}
case class Singleton[I, V](position: Array[I], value: V) extends Cluster[I, V]
{
def children = Seq()
def radius = 0.0
def size = 1
def children = Array()
def render(firstPrefix: String, restPrefix: String): String =
firstPrefix + " <" + position.mkString(", ") + s"> -> $value"
def elements: Iterator[Singleton[I, V]] =
Some(this).iterator
}

View File

@ -5,6 +5,7 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I])
{
type Key = Array[I]
var root: Option[Node] = None
private var _size = 0
def insert(position: Key, value: V): Unit =
{
@ -13,6 +14,7 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I])
case Some(node) => node.insert(position, value)
case None => Leaf(position, Set(value), 0)
})
_size = _size + 1
}
def insertAll(elements: Iterable[(Key, V)]): Unit =
@ -25,6 +27,7 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I])
case Some(node) => node.insertAll(elements.toSeq)
case None => fragmentSeq(elements.toSeq, 0)
})
_size = _size + elements.size
}
def remove(position: Key, value: V): Boolean =
@ -35,11 +38,15 @@ class KDTree[I: Ordering, V](dimensions: Int)(implicit iTag: ClassTag[I])
{
val (newRoot, ret) = r.remove(position, value)
root = newRoot
_size = _size - 1
return ret
}
}
}
def isEmpty = root.isEmpty
def size = _size
def nearest(position: Key, measure: Distance[I], ignore: (Array[I], Set[V], Double) => Boolean = {(_, _, _) => false}): Option[(Key, Set[V], Double)] =
{
root.flatMap { _.nearest(

View File

@ -12,4 +12,10 @@ object Time {
def apply[V](label: String)(f: => V): V =
apply( t => println(s"[$label] $t s") )(f)
}
case class Timer(label: String)
{
var tot = 0.0
def apply[V](f: => V): V = Time.apply( tot += _ ){ f }
}
}

View File

@ -83,6 +83,12 @@ class HierarchicalClusteringTests extends AnyFlatSpec {
}
}
it should "run fast" in {
val data = TestData.makeData(500).zipWithIndex
Time("Hierarchical@500"){
HierarchicalClustering.bottomUp(data, EuclideanDistance)
}
}
it should "perform sensibly" in {
@ -94,15 +100,14 @@ class HierarchicalClusteringTests extends AnyFlatSpec {
val datasets = Seq(
100,
200,
300,
400,
250,
500,
600,
700,
800,
900,
1000,
2500,
5000,
10000,
// 25000,
// 50000
).map { s =>
s -> TestData.makeData(s).zipWithIndex
}
@ -112,7 +117,7 @@ class HierarchicalClusteringTests extends AnyFlatSpec {
log("size, time_s")
for( (size, data) <- datasets )
{
val skip = (strategy == "Naive" && size > 400)
val skip = (strategy == "Naive" && size >= 1000)
if(!skip){
Time(t => { log(s"$size, $t"); println(s"$strategy @ $size: $t s") }){
makeClusters(data, EuclideanDistance)

View File

@ -26,25 +26,44 @@ object TestData
}
def centroid(a: Iterable[Array[Double]]): Array[Double] =
{
(0 until DIMENSIONS)
.map { i => a.map { _(i) }.sum / a.size }
.toArray
val ret = Array.ofDim[Double](DIMENSIONS)
for(pt <- a)
{
for(i <- 0 until DIMENSIONS)
{
ret(i) += pt(i)
}
}
for(i <- 0 until DIMENSIONS)
{
ret(i) /= a.size
}
return ret
}
}
object ManhattanDistance extends BaseDistance
{
def pointToPoint(a: Array[Double], b: Array[Double]): Double =
{
a.zip(b).map { case (a, b) => Math.abs(a - b) }.sum
var tot = 0.0
for(i <- 0 until a.size)
{
tot += Math.abs(a(i) - b(i))
}
return tot
}
}
object EuclideanDistance extends BaseDistance
{
def pointToPoint(a: Array[Double], b: Array[Double]): Double =
{
Math.sqrt(
a.zip(b).map { case (a, b) => Math.pow(Math.abs(a - b), 2) }.sum
)
var tot = 0.0
for(i <- 0 until a.size)
{
val v = Math.abs(a(i) - b(i))
tot += v * v
}
Math.sqrt(tot)
}
}

View File

@ -18,6 +18,9 @@ import org.apache.sedona.spark.SedonaContext
import org.apache.logging.log4j.core.tools.picocli.CommandLine.Help.Column
import org.mimirdb.pip.distribution._
import scala.util.Random
import org.mimirdb.pip.lib._
object Main
{
@ -28,167 +31,178 @@ object Main
def main(args: Array[String]): Unit =
{
val spark = SparkSession.builder
.appName("pip")
.master("local[*]")
.getOrCreate()
val size =
args.headOption.map { _.toInt }.getOrElse { 4000 }
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setCheckpointDir("spark-warehouse")
println(s"Size: $size")
Pip.init(spark)
/*
Reproducing Vizier mars_rover workflow
NOTE:
this will probably be migrated into its own files later down the road
*/
import org.apache.spark.sql.DataFrameReader
/* It appears hadoop doesn't like urls, so need to
i) download the file locally
ii) then read it in
*/
val webData = "https://mars.nasa.gov/mmgis-maps/M20/Layers/json/M20_waypoints.json"
val fileData: String = "marsRoverData.json"
/* We don't need to download if we already have the file.
Source: http://stackoverflow.com/questions/21177107/ddg#21178667
*/
if (!Files.exists(Paths.get(fileData))) {
println("We didn't find the file. Now downloading...")
fileDownload(webData, fileData)
val data = TestData.makeData(size).zipWithIndex
Time(s"Hierarchical@$size"){
HierarchicalClustering.bottomUp(data, TestData.EuclideanDistance)
}
println("We did find the file, now reading.")
assert(SedonaContext.create(spark) eq spark)
var df = spark.read.option("multiLine", true).json(fileData)
/* Create temporary Spark view to query */
df.createOrReplaceTempView("trips")
////////////////////////////////////////////////////////
// Extract GeoJSON and properties field from the data
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT features.type,
features.properties.*,
ST_GeomFromGeoJSON(to_json(features.geometry)) as geo
FROM (
SELECT explode(features) AS features FROM trips
)
""").coalesce(1)
// sqlDF.printSchema()
// df.show(false)
df.createOrReplaceTempView("traverse_data")
////////////////////////////////////////////////////////
// Trip Times
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT *,
dist_km - lag(dist_km, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS km_traveled,
sol - lag(sol, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS sols_traveled
FROM traverse_data
WHERE dist_km > 0
""")
// df.show(false)
df.createOrReplaceTempView("traverse_data")
// spark.sql("""
// SELECT max(km_traveled * 1000 / sols_traveled) as m_per_sol
// val spark = SparkSession.builder
// .appName("pip")
// .master("local[*]")
// .getOrCreate()
// spark.sparkContext.setLogLevel("WARN")
// spark.sparkContext.setCheckpointDir("spark-warehouse")
// Pip.init(spark)
// /*
// Reproducing Vizier mars_rover workflow
// NOTE:
// this will probably be migrated into its own files later down the road
// */
// import org.apache.spark.sql.DataFrameReader
// /* It appears hadoop doesn't like urls, so need to
// i) download the file locally
// ii) then read it in
// */
// val webData = "https://mars.nasa.gov/mmgis-maps/M20/Layers/json/M20_waypoints.json"
// val fileData: String = "marsRoverData.json"
// /* We don't need to download if we already have the file.
// Source: http://stackoverflow.com/questions/21177107/ddg#21178667
// */
// if (!Files.exists(Paths.get(fileData))) {
// println("We didn't find the file. Now downloading...")
// fileDownload(webData, fileData)
// }
// println("We did find the file, now reading.")
// assert(SedonaContext.create(spark) eq spark)
// var df = spark.read.option("multiLine", true).json(fileData)
// /* Create temporary Spark view to query */
// df.createOrReplaceTempView("trips")
// ////////////////////////////////////////////////////////
// // Extract GeoJSON and properties field from the data
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT features.type,
// features.properties.*,
// ST_GeomFromGeoJSON(to_json(features.geometry)) as geo
// FROM (
// SELECT explode(features) AS features FROM trips
// )
// """).coalesce(1)
// // sqlDF.printSchema()
// // df.show(false)
// df.createOrReplaceTempView("traverse_data")
// ////////////////////////////////////////////////////////
// // Trip Times
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT *,
// dist_km - lag(dist_km, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS km_traveled,
// sol - lag(sol, 1, 0) OVER (PARTITION BY 1 ORDER BY sol) AS sols_traveled
// FROM traverse_data
// WHERE dist_km > 0
// """)
// // df.show(false)
// df.createOrReplaceTempView("traverse_data")
// // spark.sql("""
// // SELECT max(km_traveled * 1000 / sols_traveled) as m_per_sol
// // FROM traverse_data
// // WHERE sols_traveled > 0
// // """).show()
// // return
// ////////////////////////////////////////////////////////
// // Trip Distances
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT ST_Point(
// CAST(lon as decimal(24,20)),
// CAST(lat as decimal(24,20))
// ) as geometry,
// clamp(gaussian(cast(km_traveled as double) * 1000 / sols_traveled, 3.0), 0.0, 800) as m_per_sol
// FROM traverse_data
// WHERE sols_traveled > 0
// """).show()
// return
// """)//.checkpoint()
// // tripDist.printSchema()
// // df.show(false)
// df.createOrReplaceTempView("trip_points")
////////////////////////////////////////////////////////
// Trip Distances
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT ST_Point(
CAST(lon as decimal(24,20)),
CAST(lat as decimal(24,20))
) as geometry,
clamp(gaussian(cast(km_traveled as double) * 1000 / sols_traveled, 3.0), 0.0, 800) as m_per_sol
FROM traverse_data
WHERE sols_traveled > 0
""")//.checkpoint()
// tripDist.printSchema()
// ////////////////////////////////////////////////////////
// // Bounding Box
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT min(lat) as min_lat,
// max(lat) as max_lat,
// min(lon) as min_lon,
// max(lon) as max_lon
// FROM traverse_data
// """)
// // df.show(false)
// df.createOrReplaceTempView("mission_region")
// ////////////////////////////////////////////////////////
// // Example Histogram Regions
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT id, ST_PolygonFromEnvelope(lon_low, lat_low, lon_high, lat_high) as geometry
// FROM (
// SELECT
// 10 * lon_idx + lat_idx AS id,
// (max_lat - min_lat)/10 * lat_idx + min_lat AS lat_low,
// (max_lat - min_lat)/10 * (lat_idx+1) + min_lat AS lat_high,
// (max_lon - min_lon)/10 * lon_idx + min_lon AS lon_low,
// (max_lon - min_lon)/10 * (lon_idx+1) + min_lon AS lon_high
// FROM (SELECT id AS lon_idx from range(0,10)) AS lon_split,
// (SELECT id AS lat_idx from range(0,10)) AS lat_split,
// mission_region
// )
// """)
// // df.show(false)
// df.createOrReplaceTempView("bounding_boxes")
// ////////////////////////////////////////////////////////
// // Per-region distributions
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT box.id,
// array_agg(m_per_sol) as components,
// discretize(
// uniform_mixture(m_per_sol),
// array(0.0, 40.0, 80.0, 120.0, 160.0, 200.0, 240.0, 280.0, 320.0, 360.0, 400.0),
// 1000
// ) as m_per_sol
// FROM trip_points point,
// bounding_boxes box
// WHERE ST_Contains(box.geometry, point.geometry)
// GROUP BY box.id
// """)
// // df.show(false)
// df.createOrReplaceTempView("grid_squares")
// ////////////////////////////////////////////////////////
// // Per-region metrics
// ////////////////////////////////////////////////////////
// df = spark.sql("""
// SELECT id,
// m_per_sol,
// entropy(m_per_sol) as entropy,
// array_max(
// transform(
// components,
// x -> kl_divergence(x, m_per_sol)
// )
// ) as max_kl_div
// -- components
// FROM grid_squares
// -- LIMIT 1
// """)
// df.show(false)
df.createOrReplaceTempView("trip_points")
////////////////////////////////////////////////////////
// Bounding Box
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT min(lat) as min_lat,
max(lat) as max_lat,
min(lon) as min_lon,
max(lon) as max_lon
FROM traverse_data
""")
// df.show(false)
df.createOrReplaceTempView("mission_region")
////////////////////////////////////////////////////////
// Example Histogram Regions
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT id, ST_PolygonFromEnvelope(lon_low, lat_low, lon_high, lat_high) as geometry
FROM (
SELECT
10 * lon_idx + lat_idx AS id,
(max_lat - min_lat)/10 * lat_idx + min_lat AS lat_low,
(max_lat - min_lat)/10 * (lat_idx+1) + min_lat AS lat_high,
(max_lon - min_lon)/10 * lon_idx + min_lon AS lon_low,
(max_lon - min_lon)/10 * (lon_idx+1) + min_lon AS lon_high
FROM (SELECT id AS lon_idx from range(0,10)) AS lon_split,
(SELECT id AS lat_idx from range(0,10)) AS lat_split,
mission_region
)
""")
// df.show(false)
df.createOrReplaceTempView("bounding_boxes")
////////////////////////////////////////////////////////
// Per-region distributions
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT box.id,
array_agg(m_per_sol) as components,
discretize(
uniform_mixture(m_per_sol),
array(0.0, 40.0, 80.0, 120.0, 160.0, 200.0, 240.0, 280.0, 320.0, 360.0, 400.0),
1000
) as m_per_sol
FROM trip_points point,
bounding_boxes box
WHERE ST_Contains(box.geometry, point.geometry)
GROUP BY box.id
""")
// df.show(false)
df.createOrReplaceTempView("grid_squares")
////////////////////////////////////////////////////////
// Per-region metrics
////////////////////////////////////////////////////////
df = spark.sql("""
SELECT id,
m_per_sol,
entropy(m_per_sol) as entropy,
array_max(
transform(
components,
x -> kl_divergence(x, m_per_sol)
)
) as max_kl_div
-- components
FROM grid_squares
-- LIMIT 1
""")
df.show(false)
}
}

View File

@ -0,0 +1,78 @@
package org.mimirdb.pip.lib
import org.mimirdb.pip.distribution.Discretized
import scala.util.Random
object TestData
{
val DIMENSIONS = 10
val BIN_SIZE = 1.0 / DIMENSIONS
def positionToBins(position: Array[Double]): Seq[Discretized.Bin] =
{
position.zipWithIndex.map { case (b, i) =>
Discretized.Bin(i * BIN_SIZE, (i+1) * BIN_SIZE, b)
}.toSeq
}
trait BaseDistance extends Distance[Double]
{
val min = 0.0
val max = 1.0
def pointToPlane(a: Array[Double], b: Double, dim: Int): Double =
{
Math.abs(a(dim) - b)
}
def centroid(a: Iterable[Array[Double]]): Array[Double] =
{
val ret = Array.ofDim[Double](DIMENSIONS)
for(pt <- a)
{
for(i <- 0 until DIMENSIONS)
{
ret(i) += pt(i)
}
}
for(i <- 0 until DIMENSIONS)
{
ret(i) /= a.size
}
return ret
}
}
object ManhattanDistance extends BaseDistance
{
def pointToPoint(a: Array[Double], b: Array[Double]): Double =
{
var tot = 0.0
for(i <- 0 until a.size)
{
tot += Math.abs(a(i) - b(i))
}
return tot
}
}
object EuclideanDistance extends BaseDistance
{
def pointToPoint(a: Array[Double], b: Array[Double]): Double =
{
var tot = 0.0
for(i <- 0 until a.size)
{
val v = Math.abs(a(i) - b(i))
tot += v * v
}
Math.sqrt(tot)
}
}
def makeData(size: Int): IndexedSeq[Array[Double]] =
{
(0 until size).map { _ =>
(0 until DIMENSIONS).map { _ =>
Random.nextDouble
}.toArray
}.toArray.toIndexedSeq
}
}