Merge branch 'master' of github.com:andrewor14/incubator-spark
This commit is contained in:
commit
8bbe08b21e
|
@ -1,83 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.util
|
||||
|
||||
import org.apache.spark.util.SamplingSizeTracker.Sample
|
||||
|
||||
/**
|
||||
* Estimates the size of an object as it grows, in bytes.
|
||||
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
|
||||
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
|
||||
*
|
||||
* Users should call updateMade() every time their object is updated with new data, or
|
||||
* flushSamples() if there is a non-linear change in object size (otherwise linear is assumed).
|
||||
* Not threadsafe.
|
||||
*/
|
||||
private[spark] class SamplingSizeTracker(obj: AnyRef) {
|
||||
/**
|
||||
* Controls the base of the exponential which governs the rate of sampling.
|
||||
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
|
||||
*/
|
||||
private val SAMPLE_GROWTH_RATE = 1.1
|
||||
|
||||
private var lastLastSample: Sample = _
|
||||
private var lastSample: Sample = _
|
||||
|
||||
private var numUpdates: Long = _
|
||||
private var nextSampleNum: Long = _
|
||||
|
||||
flushSamples()
|
||||
|
||||
/** Called after a non-linear change in the tracked object. Takes a new sample. */
|
||||
def flushSamples() {
|
||||
numUpdates = 0
|
||||
nextSampleNum = 1
|
||||
// Throw out both prior samples to avoid overestimating delta.
|
||||
lastSample = Sample(SizeEstimator.estimate(obj), 0)
|
||||
lastLastSample = lastSample
|
||||
}
|
||||
|
||||
/** To be called after an update to the tracked object. Amortized O(1) time. */
|
||||
def updateMade() {
|
||||
numUpdates += 1
|
||||
if (nextSampleNum == numUpdates) {
|
||||
lastLastSample = lastSample
|
||||
lastSample = Sample(SizeEstimator.estimate(obj), numUpdates)
|
||||
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
|
||||
}
|
||||
}
|
||||
|
||||
/** Estimates the current size of the tracked object. O(1) time. */
|
||||
def estimateSize(): Long = {
|
||||
val interpolatedDelta =
|
||||
if (lastLastSample != null && lastLastSample != lastSample) {
|
||||
(lastSample.size - lastLastSample.size).toDouble /
|
||||
(lastSample.numUpdates - lastLastSample.numUpdates)
|
||||
} else if (lastSample.numUpdates > 0) {
|
||||
lastSample.size.toDouble / lastSample.numUpdates
|
||||
} else {
|
||||
0
|
||||
}
|
||||
val extrapolatedDelta = math.max(0, interpolatedDelta * (numUpdates - lastSample.numUpdates))
|
||||
(lastSample.size + extrapolatedDelta).toLong
|
||||
}
|
||||
}
|
||||
|
||||
object SamplingSizeTracker {
|
||||
case class Sample(size: Long, numUpdates: Long)
|
||||
}
|
|
@ -17,28 +17,85 @@
|
|||
|
||||
package org.apache.spark.util.collection
|
||||
|
||||
import org.apache.spark.util.SamplingSizeTracker
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
/** Append-only map that keeps track of its estimated size in bytes. */
|
||||
import org.apache.spark.util.SizeEstimator
|
||||
import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
|
||||
|
||||
/**
|
||||
* Append-only map that keeps track of its estimated size in bytes.
|
||||
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
|
||||
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
|
||||
*/
|
||||
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
|
||||
|
||||
private val sizeTracker = new SamplingSizeTracker(this)
|
||||
/**
|
||||
* Controls the base of the exponential which governs the rate of sampling.
|
||||
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
|
||||
*/
|
||||
private val SAMPLE_GROWTH_RATE = 1.1
|
||||
|
||||
def estimateSize() = sizeTracker.estimateSize()
|
||||
/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
|
||||
private val samples = new ArrayBuffer[Sample]()
|
||||
|
||||
/** Total number of insertions and updates into the map since the last resetSamples(). */
|
||||
private var numUpdates: Long = _
|
||||
|
||||
/** The value of 'numUpdates' at which we will take our next sample. */
|
||||
private var nextSampleNum: Long = _
|
||||
|
||||
/** The average number of bytes per update between our last two samples. */
|
||||
private var bytesPerUpdate: Double = _
|
||||
|
||||
resetSamples()
|
||||
|
||||
/** Called after the map grows in size, as this can be a dramatic change for small objects. */
|
||||
def resetSamples() {
|
||||
numUpdates = 1
|
||||
nextSampleNum = 1
|
||||
samples.clear()
|
||||
takeSample()
|
||||
}
|
||||
|
||||
override def update(key: K, value: V): Unit = {
|
||||
super.update(key, value)
|
||||
sizeTracker.updateMade()
|
||||
numUpdates += 1
|
||||
if (nextSampleNum == numUpdates) { takeSample() }
|
||||
}
|
||||
|
||||
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
|
||||
val newValue = super.changeValue(key, updateFunc)
|
||||
sizeTracker.updateMade()
|
||||
numUpdates += 1
|
||||
if (nextSampleNum == numUpdates) { takeSample() }
|
||||
newValue
|
||||
}
|
||||
|
||||
/** Takes a new sample of the current map's size. */
|
||||
def takeSample() {
|
||||
samples += Sample(SizeEstimator.estimate(this), numUpdates)
|
||||
// Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
|
||||
bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
|
||||
case latest :: previous :: tail =>
|
||||
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
|
||||
case _ =>
|
||||
0
|
||||
})
|
||||
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
|
||||
}
|
||||
|
||||
override protected def growTable() {
|
||||
super.growTable()
|
||||
sizeTracker.flushSamples()
|
||||
resetSamples()
|
||||
}
|
||||
|
||||
/** Estimates the current size of the map in bytes. O(1) time. */
|
||||
def estimateSize(): Long = {
|
||||
assert(samples.nonEmpty)
|
||||
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
|
||||
(samples.last.size + extrapolatedDelta).toLong
|
||||
}
|
||||
}
|
||||
|
||||
object SizeTrackingAppendOnlyMap {
|
||||
case class Sample(size: Long, numUpdates: Long)
|
||||
}
|
||||
|
|
|
@ -21,10 +21,10 @@ import scala.util.Random
|
|||
|
||||
import org.scalatest.{BeforeAndAfterAll, FunSuite}
|
||||
|
||||
import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass
|
||||
import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
|
||||
import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
|
||||
|
||||
class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
|
||||
class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
|
||||
val NORMAL_ERROR = 0.20
|
||||
val HIGH_ERROR = 0.30
|
||||
|
||||
|
@ -70,24 +70,24 @@ class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
|
|||
}
|
||||
}
|
||||
|
||||
object SamplingSizeTrackerSuite {
|
||||
object SizeTrackingAppendOnlyMapSuite {
|
||||
// Speed test, for reproducibility of results.
|
||||
// These could be highly non-deterministic in general, however.
|
||||
// Results:
|
||||
// AppendOnlyMap: 30 ms
|
||||
// SizeTracker: 45 ms
|
||||
// AppendOnlyMap: 31 ms
|
||||
// SizeTracker: 54 ms
|
||||
// SizeEstimator: 1500 ms
|
||||
def main(args: Array[String]) {
|
||||
val numElements = 100000
|
||||
|
||||
val baseTimes = for (i <- 0 until 3) yield time {
|
||||
val baseTimes = for (i <- 0 until 10) yield time {
|
||||
val map = new AppendOnlyMap[Int, LargeDummyClass]()
|
||||
for (i <- 0 until numElements) {
|
||||
map(i) = new LargeDummyClass()
|
||||
}
|
||||
}
|
||||
|
||||
val sampledTimes = for (i <- 0 until 3) yield time {
|
||||
val sampledTimes = for (i <- 0 until 10) yield time {
|
||||
val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
|
||||
for (i <- 0 until numElements) {
|
||||
map(i) = new LargeDummyClass()
|
Loading…
Reference in a new issue