Merge pull request #171 from shivaram/for-size-estimator-pull

Size estimator changes for dev
This commit is contained in:
Matei Zaharia 2012-08-13 15:29:40 -07:00
commit 942e604c62
4 changed files with 224 additions and 53 deletions

View file

@ -7,6 +7,10 @@ import java.util.IdentityHashMap
import java.util.concurrent.ConcurrentHashMap
import java.util.Random
import javax.management.MBeanServer
import java.lang.management.ManagementFactory
import com.sun.management.HotSpotDiagnosticMXBean
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntOpenHashSet
@ -18,9 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet
* Based on the following JavaWorld article:
* http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
*/
object SizeEstimator {
private val OBJECT_SIZE = 8 // Minimum size of a java.lang.Object
private val POINTER_SIZE = 4 // Size of an object reference
object SizeEstimator extends Logging {
// Sizes of primitive types
private val BYTE_SIZE = 1
@ -32,9 +34,68 @@ object SizeEstimator {
private val FLOAT_SIZE = 4
private val DOUBLE_SIZE = 8
// Alignment boundary for objects
// TODO: Is this arch dependent ?
private val ALIGN_SIZE = 8
// A cache of ClassInfo objects for each class
private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
classInfos.put(classOf[Object], new ClassInfo(OBJECT_SIZE, Nil))
// Object and pointer sizes are arch dependent
private var is64bit = false
// Size of an object reference
// Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
private var isCompressedOops = false
private var pointerSize = 4
// Minimum size of a java.lang.Object
private var objectSize = 8
initialize()
// Sets object size, pointer size based on architecture and CompressedOops settings
// from the JVM.
private def initialize() {
is64bit = System.getProperty("os.arch").contains("64")
isCompressedOops = getIsCompressedOops
objectSize = if (!is64bit) 8 else {
if(!isCompressedOops) {
16
} else {
12
}
}
pointerSize = if (is64bit && !isCompressedOops) 8 else 4
classInfos.clear()
classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
}
private def getIsCompressedOops : Boolean = {
if (System.getProperty("spark.test.useCompressedOops") != null) {
return System.getProperty("spark.test.useCompressedOops").toBoolean
}
try {
val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic";
val server = ManagementFactory.getPlatformMBeanServer();
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]);
return bean.getVMOption("UseCompressedOops").getValue.toBoolean
} catch {
case e: IllegalArgumentException => {
logWarning("Exception while trying to check if compressed oops is enabled", e)
// Fall back to checking if maxMemory < 32GB
return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
}
case e: SecurityException => {
logWarning("No permission to create MBeanServer", e)
// Fall back to checking if maxMemory < 32GB
return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
}
}
}
/**
* The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
@ -101,10 +162,17 @@ object SizeEstimator {
private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
val length = JArray.getLength(array)
val elementClass = cls.getComponentType
// Arrays have object header and length field which is an integer
var arrSize: Long = alignSize(objectSize + INT_SIZE)
if (elementClass.isPrimitive) {
state.size += length * primitiveSize(elementClass)
arrSize += alignSize(length * primitiveSize(elementClass))
state.size += arrSize
} else {
state.size += length * POINTER_SIZE
arrSize += alignSize(length * pointerSize)
state.size += arrSize
if (length <= ARRAY_SIZE_FOR_SAMPLING) {
for (i <- 0 until length) {
state.enqueue(JArray.get(array, i))
@ -170,15 +238,22 @@ object SizeEstimator {
shellSize += primitiveSize(fieldClass)
} else {
field.setAccessible(true) // Enable future get()'s on this field
shellSize += POINTER_SIZE
shellSize += pointerSize
pointerFields = field :: pointerFields
}
}
}
shellSize = alignSize(shellSize)
// Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields)
classInfos.put(cls, newInfo)
return newInfo
}
private def alignSize(size: Long): Long = {
val rem = size % ALIGN_SIZE
return if (rem == 0) size else (size + ALIGN_SIZE - rem)
}
}

View file

@ -1,31 +1,50 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.PrivateMethodTester
class BoundedMemoryCacheSuite extends FunSuite {
class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester {
test("constructor test") {
val cache = new BoundedMemoryCache(40)
expect(40)(cache.getCapacity)
val cache = new BoundedMemoryCache(60)
expect(60)(cache.getCapacity)
}
test("caching") {
val cache = new BoundedMemoryCache(40) {
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
val oldArch = System.setProperty("os.arch", "amd64")
val oldOops = System.setProperty("spark.test.useCompressedOops", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
val cache = new BoundedMemoryCache(60) {
//TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
}
}
//should be OK
expect(CachePutSuccess(30))(cache.put("1", 0, "Meh"))
expect(CachePutSuccess(56))(cache.put("1", 0, "Meh"))
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
//cache because it's from the same dataset
expect(CachePutFailure())(cache.put("1", 1, "Meh"))
//should be OK, dataset '1' can be evicted from cache
expect(CachePutSuccess(30))(cache.put("2", 0, "Meh"))
expect(CachePutSuccess(56))(cache.put("2", 0, "Meh"))
//should fail, cache should obey it's capacity
expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
if (oldArch != null) {
System.setProperty("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
if (oldOops != null) {
System.setProperty("spark.test.useCompressedOops", oldOops)
} else {
System.clearProperty("spark.test.useCompressedOops")
}
}
}

View file

@ -1,6 +1,8 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
class DummyClass1 {}
@ -17,61 +19,114 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0
}
class SizeEstimatorSuite extends FunSuite {
class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
var oldArch: String = _
var oldOops: String = _
override def beforeAll() {
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
}
override def afterAll() {
resetOrClear("os.arch", oldArch)
resetOrClear("spark.test.useCompressedOops", oldOops)
}
test("simple classes") {
expect(8)(SizeEstimator.estimate(new DummyClass1))
expect(12)(SizeEstimator.estimate(new DummyClass2))
expect(20)(SizeEstimator.estimate(new DummyClass3))
expect(16)(SizeEstimator.estimate(new DummyClass4(null)))
expect(36)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
expect(16)(SizeEstimator.estimate(new DummyClass1))
expect(16)(SizeEstimator.estimate(new DummyClass2))
expect(24)(SizeEstimator.estimate(new DummyClass3))
expect(24)(SizeEstimator.estimate(new DummyClass4(null)))
expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
test("strings") {
expect(24)(SizeEstimator.estimate(""))
expect(26)(SizeEstimator.estimate("a"))
expect(28)(SizeEstimator.estimate("ab"))
expect(40)(SizeEstimator.estimate("abcdefgh"))
expect(48)(SizeEstimator.estimate(""))
expect(56)(SizeEstimator.estimate("a"))
expect(56)(SizeEstimator.estimate("ab"))
expect(64)(SizeEstimator.estimate("abcdefgh"))
}
test("primitive arrays") {
expect(10)(SizeEstimator.estimate(new Array[Byte](10)))
expect(20)(SizeEstimator.estimate(new Array[Char](10)))
expect(20)(SizeEstimator.estimate(new Array[Short](10)))
expect(40)(SizeEstimator.estimate(new Array[Int](10)))
expect(80)(SizeEstimator.estimate(new Array[Long](10)))
expect(40)(SizeEstimator.estimate(new Array[Float](10)))
expect(80)(SizeEstimator.estimate(new Array[Double](10)))
expect(4000)(SizeEstimator.estimate(new Array[Int](1000)))
expect(8000)(SizeEstimator.estimate(new Array[Long](1000)))
expect(32)(SizeEstimator.estimate(new Array[Byte](10)))
expect(40)(SizeEstimator.estimate(new Array[Char](10)))
expect(40)(SizeEstimator.estimate(new Array[Short](10)))
expect(56)(SizeEstimator.estimate(new Array[Int](10)))
expect(96)(SizeEstimator.estimate(new Array[Long](10)))
expect(56)(SizeEstimator.estimate(new Array[Float](10)))
expect(96)(SizeEstimator.estimate(new Array[Double](10)))
expect(4016)(SizeEstimator.estimate(new Array[Int](1000)))
expect(8016)(SizeEstimator.estimate(new Array[Long](1000)))
}
test("object arrays") {
// Arrays containing nulls should just have one pointer per element
expect(40)(SizeEstimator.estimate(new Array[String](10)))
expect(40)(SizeEstimator.estimate(new Array[AnyRef](10)))
expect(56)(SizeEstimator.estimate(new Array[String](10)))
expect(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
// For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.)
expect(120)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
expect(160)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
expect(240)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
expect(12 + 16)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
// Past size 100, our samples 100 elements, but we should still get the right size.
expect(24000)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
expect(48)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
expect(408)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4008 because it can't tell that *all* the elements will equal the first
// return exactly 4032 because it can't tell that *all* the elements will equal the first
// one it samples, but it should be close to that.
// TODO: If we sample 100 elements, this should always be 4176 ?
val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000")
assert(estimatedSize <= 4100, "Estimated size " + estimatedSize + " should be less than 4100")
assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100")
}
test("32-bit arch") {
val arch = System.setProperty("os.arch", "x86")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
expect(40)(SizeEstimator.estimate(""))
expect(48)(SizeEstimator.estimate("a"))
expect(48)(SizeEstimator.estimate("ab"))
expect(56)(SizeEstimator.estimate("abcdefgh"))
resetOrClear("os.arch", arch)
}
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
expect(64)(SizeEstimator.estimate(""))
expect(72)(SizeEstimator.estimate("a"))
expect(72)(SizeEstimator.estimate("ab"))
expect(80)(SizeEstimator.estimate("abcdefgh"))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
}
def resetOrClear(prop: String, oldValue: String) {
if (oldValue != null) {
System.setProperty(prop, oldValue)
} else {
System.clearProperty(prop)
}
}
}

View file

@ -6,17 +6,27 @@ import akka.actor._
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
import spark.KryoSerializer
import spark.SizeEstimator
import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter {
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = _
var oldOops: String = _
before {
actorSystem = ActorSystem("test")
master = new BlockManagerMaster(actorSystem, true, true)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
after {
@ -24,6 +34,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
actorSystem.awaitTermination()
actorSystem = null
master = null
if (oldArch != null) {
System.setProperty("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
if (oldOops != null) {
System.setProperty("spark.test.useCompressedOops", oldOops)
} else {
System.clearProperty("spark.test.useCompressedOops")
}
}
test("manager-master interaction") {
@ -57,7 +79,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("in-memory LRU storage") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@ -78,7 +100,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("in-memory LRU storage with serialization") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@ -99,7 +121,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("on-disk storage") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@ -112,7 +134,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("disk and memory storage") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@ -126,7 +148,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("disk and memory storage with serialization") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@ -140,7 +162,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("LRU with mixed storage levels") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@ -166,7 +188,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("in-memory LRU with streams") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@ -192,7 +214,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
}
test("LRU with mixed storage levels and streams") {
val store = new BlockManager(master, new KryoSerializer, 1000)
val store = new BlockManager(master, new KryoSerializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))