[SPARK-6030] [CORE] Using simulated field layout method to compute class shellSize

SizeEstimator gives wrong result for Integer on 64bit JVM with UseCompressedOops on, this pr fixes that. For more details, please refer [SPARK-6030](https://issues.apache.org/jira/browse/SPARK-6030)
sryza, I noticed there is a pr to expose SizeEstimator, maybe that should be waited by this pr get merged if we confirm this problem.
And shivaram would you mind to review this pr since you contribute related code. Also cc to srowen and mateiz

Author: Ye Xianjin <advancedxy@gmail.com>

Closes #4783 from advancedxy/SPARK-6030 and squashes the following commits:

c4dcb41 [Ye Xianjin] Add super.beforeEach in the beforeEach method to make the trait stackable.. Remove useless leading whitespace.
3f80640 [Ye Xianjin] The size of Integer class changes from 24 to 16 on a 64-bit JVM with -UseCompressedOops flag on after the fix. I don't how 100000 was originally calculated, It looks like 100000 is the magic number which makes sure spilling. Because of the size change, It fails because there is no spilling at all. Change the number to a slightly larger number fixes that.
e849d2d [Ye Xianjin] Merge two shellSize assignments into one. Add some explanation to alignSizeUp method.
85a0b51 [Ye Xianjin] Fix typos and update wording in comments. Using alignSizeUp to compute alignSize.
d27eb77 [Ye Xianjin] Add some detailed comments in the code. Add some test cases. It's very difficult to design test cases as the final object alignment will hide a lot of filed layout details if we just considering the whole size.
842aed1 [Ye Xianjin] primitiveSize(cls) can just return Int. Use a simplified class field layout method to calculate class instance size. Will add more documents and test cases. Add a new alignSizeUp function which uses bitwise operators to speedup.
62e8ab4 [Ye Xianjin] Don't alignSize for objects' shellSize, alignSize when added to state.size. Add some primitive wrapper objects size tests.
This commit is contained in:
Ye Xianjin 2015-05-02 23:08:09 +01:00 committed by Sean Owen
parent da303526e5
commit bfcd528d6f
3 changed files with 100 additions and 18 deletions

View file

@ -47,6 +47,11 @@ private[spark] object SizeEstimator extends Logging {
private val FLOAT_SIZE = 4
private val DOUBLE_SIZE = 8
// Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
// a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
// The sizes should be in descending order, as we will use that information for fields placement.
private val fieldSizes = List(8, 4, 2, 1)
// Alignment boundary for objects
// TODO: Is this arch dependent ?
private val ALIGN_SIZE = 8
@ -171,7 +176,7 @@ private[spark] object SizeEstimator extends Logging {
// general all ClassLoaders and Classes will be shared between objects anyway.
} else {
val classInfo = getClassInfo(cls)
state.size += classInfo.shellSize
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
}
@ -237,8 +242,8 @@ private[spark] object SizeEstimator extends Logging {
}
size
}
private def primitiveSize(cls: Class[_]): Long = {
private def primitiveSize(cls: Class[_]): Int = {
if (cls == classOf[Byte]) {
BYTE_SIZE
} else if (cls == classOf[Boolean]) {
@ -274,21 +279,50 @@ private[spark] object SizeEstimator extends Logging {
val parent = getClassInfo(cls.getSuperclass)
var shellSize = parent.shellSize
var pointerFields = parent.pointerFields
val sizeCount = Array.fill(fieldSizes.max + 1)(0)
// iterate through the fields of this class and gather information.
for (field <- cls.getDeclaredFields) {
if (!Modifier.isStatic(field.getModifiers)) {
val fieldClass = field.getType
if (fieldClass.isPrimitive) {
shellSize += primitiveSize(fieldClass)
sizeCount(primitiveSize(fieldClass)) += 1
} else {
field.setAccessible(true) // Enable future get()'s on this field
shellSize += pointerSize
sizeCount(pointerSize) += 1
pointerFields = field :: pointerFields
}
}
}
shellSize = alignSize(shellSize)
// Based on the simulated field layout code in Aleksey Shipilev's report:
// http://cr.openjdk.java.net/~shade/papers/2013-shipilev-fieldlayout-latest.pdf
// The code is in Figure 9.
// The simplified idea of field layout consists of 4 parts (see more details in the report):
//
// 1. field alignment: HotSpot lays out the fields aligned by their size.
// 2. object alignment: HotSpot rounds instance size up to 8 bytes
// 3. consistent fields layouts throughout the hierarchy: This means we should layout
// superclass first. And we can use superclass's shellSize as a starting point to layout the
// other fields in this class.
// 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
// with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
//
// The real world field layout is much more complicated. There are three kinds of fields
// order in Java 8. And we don't consider the @contended annotation introduced by Java 8.
// see the HotSpot classloader code, layout_fields method for more details.
// hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/classFileParser.cpp
var alignedSize = shellSize
for (size <- fieldSizes if sizeCount(size) > 0) {
val count = sizeCount(size)
// If there are internal gaps, smaller field can fit in.
alignedSize = math.max(alignedSize, alignSizeUp(shellSize, size) + size * count)
shellSize += size * count
}
// Should choose a larger size to be new shellSize and clearly alignedSize >= shellSize, and
// round up the instance filed blocks
shellSize = alignSizeUp(alignedSize, pointerSize)
// Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields)
@ -296,8 +330,15 @@ private[spark] object SizeEstimator extends Logging {
newInfo
}
private def alignSize(size: Long): Long = {
val rem = size % ALIGN_SIZE
if (rem == 0) size else (size + ALIGN_SIZE - rem)
}
private def alignSize(size: Long): Long = alignSizeUp(size, ALIGN_SIZE)
/**
* Compute aligned size. The alignSize must be 2^n, otherwise the result will be wrong.
* When alignSize = 2^n, alignSize - 1 = 2^n - 1. The binary representation of (alignSize - 1)
* will only have n trailing 1s(0b00...001..1). ~(alignSize - 1) will be 0b11..110..0. Hence,
* (size + alignSize - 1) & ~(alignSize - 1) will set the last n bits to zeros, which leads to
* multiple of alignSize.
*/
private def alignSizeUp(size: Long, alignSize: Int): Long =
(size + alignSize - 1) & ~(alignSize - 1)
}

View file

@ -36,6 +36,15 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0
}
// dummy class to show class field blocks alignment.
class DummyClass5 extends DummyClass1 {
val x: Boolean = true
}
class DummyClass6 extends DummyClass5 {
val y: Boolean = true
}
object DummyString {
def apply(str: String) : DummyString = new DummyString(str.toArray)
}
@ -50,6 +59,7 @@ class SizeEstimatorSuite
override def beforeEach() {
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
super.beforeEach()
System.setProperty("os.arch", "amd64")
System.setProperty("spark.test.useCompressedOops", "true")
}
@ -62,6 +72,22 @@ class SizeEstimatorSuite
assertResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
test("primitive wrapper objects") {
assertResult(16)(SizeEstimator.estimate(new java.lang.Boolean(true)))
assertResult(16)(SizeEstimator.estimate(new java.lang.Byte("1")))
assertResult(16)(SizeEstimator.estimate(new java.lang.Character('1')))
assertResult(16)(SizeEstimator.estimate(new java.lang.Short("1")))
assertResult(16)(SizeEstimator.estimate(new java.lang.Integer(1)))
assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
assertResult(16)(SizeEstimator.estimate(new java.lang.Float(1.0)))
assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
}
test("class field blocks rounding") {
assertResult(16)(SizeEstimator.estimate(new DummyClass5))
assertResult(24)(SizeEstimator.estimate(new DummyClass6))
}
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
@ -102,18 +128,18 @@ class SizeEstimatorSuite
val arr = new Array[Char](100000)
assertResult(200016)(SizeEstimator.estimate(arr))
assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new DummyString(arr))))
val buf = new ArrayBuffer[DummyString]()
for (i <- 0 until 5000) {
buf.append(new DummyString(new Array[Char](10)))
}
assertResult(340016)(SizeEstimator.estimate(buf.toArray))
for (i <- 0 until 5000) {
buf.append(new DummyString(arr))
}
assertResult(683912)(SizeEstimator.estimate(buf.toArray))
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
// 10 pointers plus 8-byte object
@ -155,5 +181,20 @@ class SizeEstimatorSuite
assertResult(64)(SizeEstimator.estimate(DummyString("a")))
assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
// primitive wrapper classes
assertResult(24)(SizeEstimator.estimate(new java.lang.Boolean(true)))
assertResult(24)(SizeEstimator.estimate(new java.lang.Byte("1")))
assertResult(24)(SizeEstimator.estimate(new java.lang.Character('1')))
assertResult(24)(SizeEstimator.estimate(new java.lang.Short("1")))
assertResult(24)(SizeEstimator.estimate(new java.lang.Integer(1)))
assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
assertResult(24)(SizeEstimator.estimate(new java.lang.Float(1.0)))
assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
}
test("class field blocks rounding on 64-bit VM without useCompressedOops") {
assertResult(24)(SizeEstimator.estimate(new DummyClass5))
assertResult(32)(SizeEstimator.estimate(new DummyClass6))
}
}

View file

@ -377,7 +377,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
assertDidNotBypassMergeSort(sorter)
sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
sorter.stop()
assert(diskBlockManager.getAllBlocks().length === 0)
@ -385,9 +385,9 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
val sorter2 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
assertDidNotBypassMergeSort(sorter2)
sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
sorter2.stop()
assert(diskBlockManager.getAllBlocks().length === 0)
}
@ -428,8 +428,8 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
None, Some(new HashPartitioner(3)), Some(ord), None)
assertDidNotBypassMergeSort(sorter)
intercept[SparkException] {
sorter.insertAll((0 until 100000).iterator.map(i => {
if (i == 99990) {
sorter.insertAll((0 until 120000).iterator.map(i => {
if (i == 119990) {
throw new SparkException("Intentional failure")
}
(i, i)