Merge pull request #169 from shivaram/master
Changes to SizeEstimator more accurate
This commit is contained in:
commit
680df96c43
|
@ -7,6 +7,10 @@ import java.util.IdentityHashMap
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.Random
|
||||
|
||||
import javax.management.MBeanServer
|
||||
import java.lang.management.ManagementFactory
|
||||
import com.sun.management.HotSpotDiagnosticMXBean
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.IntOpenHashSet
|
||||
|
@ -18,9 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet
|
|||
* Based on the following JavaWorld article:
|
||||
* http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
|
||||
*/
|
||||
object SizeEstimator {
|
||||
private val OBJECT_SIZE = 8 // Minimum size of a java.lang.Object
|
||||
private val POINTER_SIZE = 4 // Size of an object reference
|
||||
object SizeEstimator extends Logging {
|
||||
|
||||
// Sizes of primitive types
|
||||
private val BYTE_SIZE = 1
|
||||
|
@ -32,9 +34,68 @@ object SizeEstimator {
|
|||
private val FLOAT_SIZE = 4
|
||||
private val DOUBLE_SIZE = 8
|
||||
|
||||
// Alignment boundary for objects
|
||||
// TODO: Is this arch dependent ?
|
||||
private val ALIGN_SIZE = 8
|
||||
|
||||
// A cache of ClassInfo objects for each class
|
||||
private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
|
||||
classInfos.put(classOf[Object], new ClassInfo(OBJECT_SIZE, Nil))
|
||||
|
||||
// Object and pointer sizes are arch dependent
|
||||
private var is64bit = false
|
||||
|
||||
// Size of an object reference
|
||||
// Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
|
||||
private var isCompressedOops = false
|
||||
private var pointerSize = 4
|
||||
|
||||
// Minimum size of a java.lang.Object
|
||||
private var objectSize = 8
|
||||
|
||||
initialize()
|
||||
|
||||
// Sets object size, pointer size based on architecture and CompressedOops settings
|
||||
// from the JVM.
|
||||
private def initialize() {
|
||||
is64bit = System.getProperty("os.arch").contains("64")
|
||||
isCompressedOops = getIsCompressedOops
|
||||
|
||||
objectSize = if (!is64bit) 8 else {
|
||||
if(!isCompressedOops) {
|
||||
16
|
||||
} else {
|
||||
12
|
||||
}
|
||||
}
|
||||
pointerSize = if (is64bit && !isCompressedOops) 8 else 4
|
||||
classInfos.clear()
|
||||
classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
|
||||
}
|
||||
|
||||
private def getIsCompressedOops : Boolean = {
|
||||
if (System.getProperty("spark.test.useCompressedOops") != null) {
|
||||
return System.getProperty("spark.test.useCompressedOops").toBoolean
|
||||
}
|
||||
try {
|
||||
val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic";
|
||||
val server = ManagementFactory.getPlatformMBeanServer();
|
||||
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
|
||||
hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]);
|
||||
return bean.getVMOption("UseCompressedOops").getValue.toBoolean
|
||||
} catch {
|
||||
case e: IllegalArgumentException => {
|
||||
logWarning("Exception while trying to check if compressed oops is enabled", e)
|
||||
// Fall back to checking if maxMemory < 32GB
|
||||
return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
|
||||
}
|
||||
|
||||
case e: SecurityException => {
|
||||
logWarning("No permission to create MBeanServer", e)
|
||||
// Fall back to checking if maxMemory < 32GB
|
||||
return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
|
||||
|
@ -101,10 +162,17 @@ object SizeEstimator {
|
|||
private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
|
||||
val length = JArray.getLength(array)
|
||||
val elementClass = cls.getComponentType
|
||||
|
||||
// Arrays have object header and length field which is an integer
|
||||
var arrSize: Long = alignSize(objectSize + INT_SIZE)
|
||||
|
||||
if (elementClass.isPrimitive) {
|
||||
state.size += length * primitiveSize(elementClass)
|
||||
arrSize += alignSize(length * primitiveSize(elementClass))
|
||||
state.size += arrSize
|
||||
} else {
|
||||
state.size += length * POINTER_SIZE
|
||||
arrSize += alignSize(length * pointerSize)
|
||||
state.size += arrSize
|
||||
|
||||
if (length <= ARRAY_SIZE_FOR_SAMPLING) {
|
||||
for (i <- 0 until length) {
|
||||
state.enqueue(JArray.get(array, i))
|
||||
|
@ -170,15 +238,22 @@ object SizeEstimator {
|
|||
shellSize += primitiveSize(fieldClass)
|
||||
} else {
|
||||
field.setAccessible(true) // Enable future get()'s on this field
|
||||
shellSize += POINTER_SIZE
|
||||
shellSize += pointerSize
|
||||
pointerFields = field :: pointerFields
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shellSize = alignSize(shellSize)
|
||||
|
||||
// Create and cache a new ClassInfo
|
||||
val newInfo = new ClassInfo(shellSize, pointerFields)
|
||||
classInfos.put(cls, newInfo)
|
||||
return newInfo
|
||||
}
|
||||
|
||||
private def alignSize(size: Long): Long = {
|
||||
val rem = size % ALIGN_SIZE
|
||||
return if (rem == 0) size else (size + ALIGN_SIZE - rem)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,31 +1,50 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.PrivateMethodTester
|
||||
|
||||
class BoundedMemoryCacheSuite extends FunSuite {
|
||||
class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester {
|
||||
test("constructor test") {
|
||||
val cache = new BoundedMemoryCache(40)
|
||||
expect(40)(cache.getCapacity)
|
||||
val cache = new BoundedMemoryCache(60)
|
||||
expect(60)(cache.getCapacity)
|
||||
}
|
||||
|
||||
test("caching") {
|
||||
val cache = new BoundedMemoryCache(40) {
|
||||
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
|
||||
val oldArch = System.setProperty("os.arch", "amd64")
|
||||
val oldOops = System.setProperty("spark.test.useCompressedOops", "true")
|
||||
val initialize = PrivateMethod[Unit]('initialize)
|
||||
SizeEstimator invokePrivate initialize()
|
||||
|
||||
val cache = new BoundedMemoryCache(60) {
|
||||
//TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
|
||||
override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
|
||||
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
|
||||
}
|
||||
}
|
||||
//should be OK
|
||||
expect(CachePutSuccess(30))(cache.put("1", 0, "Meh"))
|
||||
expect(CachePutSuccess(56))(cache.put("1", 0, "Meh"))
|
||||
|
||||
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
|
||||
//cache because it's from the same dataset
|
||||
expect(CachePutFailure())(cache.put("1", 1, "Meh"))
|
||||
|
||||
//should be OK, dataset '1' can be evicted from cache
|
||||
expect(CachePutSuccess(30))(cache.put("2", 0, "Meh"))
|
||||
expect(CachePutSuccess(56))(cache.put("2", 0, "Meh"))
|
||||
|
||||
//should fail, cache should obey it's capacity
|
||||
expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
|
||||
|
||||
if (oldArch != null) {
|
||||
System.setProperty("os.arch", oldArch)
|
||||
} else {
|
||||
System.clearProperty("os.arch")
|
||||
}
|
||||
|
||||
if (oldOops != null) {
|
||||
System.setProperty("spark.test.useCompressedOops", oldOops)
|
||||
} else {
|
||||
System.clearProperty("spark.test.useCompressedOops")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.PrivateMethodTester
|
||||
|
||||
class DummyClass1 {}
|
||||
|
||||
|
@ -17,61 +19,114 @@ class DummyClass4(val d: DummyClass3) {
|
|||
val x: Int = 0
|
||||
}
|
||||
|
||||
class SizeEstimatorSuite extends FunSuite {
|
||||
class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
|
||||
var oldArch: String = _
|
||||
var oldOops: String = _
|
||||
|
||||
override def beforeAll() {
|
||||
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
|
||||
oldArch = System.setProperty("os.arch", "amd64")
|
||||
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
resetOrClear("os.arch", oldArch)
|
||||
resetOrClear("spark.test.useCompressedOops", oldOops)
|
||||
}
|
||||
|
||||
test("simple classes") {
|
||||
expect(8)(SizeEstimator.estimate(new DummyClass1))
|
||||
expect(12)(SizeEstimator.estimate(new DummyClass2))
|
||||
expect(20)(SizeEstimator.estimate(new DummyClass3))
|
||||
expect(16)(SizeEstimator.estimate(new DummyClass4(null)))
|
||||
expect(36)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
|
||||
expect(16)(SizeEstimator.estimate(new DummyClass1))
|
||||
expect(16)(SizeEstimator.estimate(new DummyClass2))
|
||||
expect(24)(SizeEstimator.estimate(new DummyClass3))
|
||||
expect(24)(SizeEstimator.estimate(new DummyClass4(null)))
|
||||
expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
|
||||
}
|
||||
|
||||
test("strings") {
|
||||
expect(24)(SizeEstimator.estimate(""))
|
||||
expect(26)(SizeEstimator.estimate("a"))
|
||||
expect(28)(SizeEstimator.estimate("ab"))
|
||||
expect(40)(SizeEstimator.estimate("abcdefgh"))
|
||||
expect(48)(SizeEstimator.estimate(""))
|
||||
expect(56)(SizeEstimator.estimate("a"))
|
||||
expect(56)(SizeEstimator.estimate("ab"))
|
||||
expect(64)(SizeEstimator.estimate("abcdefgh"))
|
||||
}
|
||||
|
||||
test("primitive arrays") {
|
||||
expect(10)(SizeEstimator.estimate(new Array[Byte](10)))
|
||||
expect(20)(SizeEstimator.estimate(new Array[Char](10)))
|
||||
expect(20)(SizeEstimator.estimate(new Array[Short](10)))
|
||||
expect(40)(SizeEstimator.estimate(new Array[Int](10)))
|
||||
expect(80)(SizeEstimator.estimate(new Array[Long](10)))
|
||||
expect(40)(SizeEstimator.estimate(new Array[Float](10)))
|
||||
expect(80)(SizeEstimator.estimate(new Array[Double](10)))
|
||||
expect(4000)(SizeEstimator.estimate(new Array[Int](1000)))
|
||||
expect(8000)(SizeEstimator.estimate(new Array[Long](1000)))
|
||||
expect(32)(SizeEstimator.estimate(new Array[Byte](10)))
|
||||
expect(40)(SizeEstimator.estimate(new Array[Char](10)))
|
||||
expect(40)(SizeEstimator.estimate(new Array[Short](10)))
|
||||
expect(56)(SizeEstimator.estimate(new Array[Int](10)))
|
||||
expect(96)(SizeEstimator.estimate(new Array[Long](10)))
|
||||
expect(56)(SizeEstimator.estimate(new Array[Float](10)))
|
||||
expect(96)(SizeEstimator.estimate(new Array[Double](10)))
|
||||
expect(4016)(SizeEstimator.estimate(new Array[Int](1000)))
|
||||
expect(8016)(SizeEstimator.estimate(new Array[Long](1000)))
|
||||
}
|
||||
|
||||
test("object arrays") {
|
||||
// Arrays containing nulls should just have one pointer per element
|
||||
expect(40)(SizeEstimator.estimate(new Array[String](10)))
|
||||
expect(40)(SizeEstimator.estimate(new Array[AnyRef](10)))
|
||||
expect(56)(SizeEstimator.estimate(new Array[String](10)))
|
||||
expect(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
|
||||
|
||||
// For object arrays with non-null elements, each object should take one pointer plus
|
||||
// however many bytes that class takes. (Note that Array.fill calls the code in its
|
||||
// second parameter separately for each object, so we get distinct objects.)
|
||||
expect(120)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
|
||||
expect(160)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
|
||||
expect(240)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
|
||||
expect(12 + 16)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
|
||||
expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
|
||||
expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
|
||||
expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
|
||||
expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
|
||||
|
||||
// Past size 100, our samples 100 elements, but we should still get the right size.
|
||||
expect(24000)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
|
||||
expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
|
||||
|
||||
// If an array contains the *same* element many times, we should only count it once.
|
||||
val d1 = new DummyClass1
|
||||
expect(48)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
|
||||
expect(408)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
|
||||
expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
|
||||
expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
|
||||
|
||||
// Same thing with huge array containing the same element many times. Note that this won't
|
||||
// return exactly 4008 because it can't tell that *all* the elements will equal the first
|
||||
// return exactly 4032 because it can't tell that *all* the elements will equal the first
|
||||
// one it samples, but it should be close to that.
|
||||
|
||||
// TODO: If we sample 100 elements, this should always be 4176 ?
|
||||
val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
|
||||
assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000")
|
||||
assert(estimatedSize <= 4100, "Estimated size " + estimatedSize + " should be less than 4100")
|
||||
assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100")
|
||||
}
|
||||
|
||||
test("32-bit arch") {
|
||||
val arch = System.setProperty("os.arch", "x86")
|
||||
|
||||
val initialize = PrivateMethod[Unit]('initialize)
|
||||
SizeEstimator invokePrivate initialize()
|
||||
|
||||
expect(40)(SizeEstimator.estimate(""))
|
||||
expect(48)(SizeEstimator.estimate("a"))
|
||||
expect(48)(SizeEstimator.estimate("ab"))
|
||||
expect(56)(SizeEstimator.estimate("abcdefgh"))
|
||||
|
||||
resetOrClear("os.arch", arch)
|
||||
}
|
||||
|
||||
test("64-bit arch with no compressed oops") {
|
||||
val arch = System.setProperty("os.arch", "amd64")
|
||||
val oops = System.setProperty("spark.test.useCompressedOops", "false")
|
||||
|
||||
val initialize = PrivateMethod[Unit]('initialize)
|
||||
SizeEstimator invokePrivate initialize()
|
||||
|
||||
expect(64)(SizeEstimator.estimate(""))
|
||||
expect(72)(SizeEstimator.estimate("a"))
|
||||
expect(72)(SizeEstimator.estimate("ab"))
|
||||
expect(80)(SizeEstimator.estimate("abcdefgh"))
|
||||
|
||||
resetOrClear("os.arch", arch)
|
||||
resetOrClear("spark.test.useCompressedOops", oops)
|
||||
}
|
||||
|
||||
def resetOrClear(prop: String, oldValue: String) {
|
||||
if (oldValue != null) {
|
||||
System.setProperty(prop, oldValue)
|
||||
} else {
|
||||
System.clearProperty(prop)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue