[SPARK-11792] [SQL] [FOLLOW-UP] Change SizeEstimation to KnownSizeEstimation and make estimatedSize return Long instead of Option[Long]
https://issues.apache.org/jira/browse/SPARK-11792 The main changes include: * Renaming `SizeEstimation` to `KnownSizeEstimation`. Hopefully this new name has more information. * Making `estimatedSize` return `Long` instead of `Option[Long]`. * In `UnsaveHashedRelation`, `estimatedSize` will delegate the work to `SizeEstimator` if we have not created a `BytesToBytesMap`. Since we will put `UnsaveHashedRelation` to `BlockManager`, it is generally good to let it provide a more accurate size estimation. Also, if we do not put `BytesToBytesMap` directly into `BlockerManager`, I feel it is not really necessary to make `BytesToBytesMap` extends `KnownSizeEstimation`. Author: Yin Huai <yhuai@databricks.com> Closes #9813 from yhuai/SPARK-11792-followup.
This commit is contained in:
parent
90a7519daa
commit
6f99522d13
|
@ -36,9 +36,14 @@ import org.apache.spark.util.collection.OpenHashSet
|
||||||
* When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first.
|
* When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first.
|
||||||
* If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the returned size
|
* If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the returned size
|
||||||
* as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work.
|
* as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work.
|
||||||
|
* The difference between a [[KnownSizeEstimation]] and
|
||||||
|
* [[org.apache.spark.util.collection.SizeTracker]] is that, a
|
||||||
|
* [[org.apache.spark.util.collection.SizeTracker]] still uses [[SizeEstimator]] to
|
||||||
|
* estimate the size. However, a [[KnownSizeEstimation]] can provide a better estimation without
|
||||||
|
* using [[SizeEstimator]].
|
||||||
*/
|
*/
|
||||||
private[spark] trait SizeEstimation {
|
private[spark] trait KnownSizeEstimation {
|
||||||
def estimatedSize: Option[Long]
|
def estimatedSize: Long
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -209,18 +214,15 @@ object SizeEstimator extends Logging {
|
||||||
// the size estimator since it references the whole REPL. Do nothing in this case. In
|
// the size estimator since it references the whole REPL. Do nothing in this case. In
|
||||||
// general all ClassLoaders and Classes will be shared between objects anyway.
|
// general all ClassLoaders and Classes will be shared between objects anyway.
|
||||||
} else {
|
} else {
|
||||||
val estimatedSize = obj match {
|
obj match {
|
||||||
case s: SizeEstimation => s.estimatedSize
|
case s: KnownSizeEstimation =>
|
||||||
case _ => None
|
state.size += s.estimatedSize
|
||||||
}
|
case _ =>
|
||||||
if (estimatedSize.isDefined) {
|
val classInfo = getClassInfo(cls)
|
||||||
state.size += estimatedSize.get
|
state.size += alignSize(classInfo.shellSize)
|
||||||
} else {
|
for (field <- classInfo.pointerFields) {
|
||||||
val classInfo = getClassInfo(cls)
|
state.enqueue(field.get(obj))
|
||||||
state.size += alignSize(classInfo.shellSize)
|
}
|
||||||
for (field <- classInfo.pointerFields) {
|
|
||||||
state.enqueue(field.get(obj))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,16 +60,10 @@ class DummyString(val arr: Array[Char]) {
|
||||||
@transient val hash32: Int = 0
|
@transient val hash32: Int = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
class DummyClass8 extends SizeEstimation {
|
class DummyClass8 extends KnownSizeEstimation {
|
||||||
val x: Int = 0
|
val x: Int = 0
|
||||||
|
|
||||||
override def estimatedSize: Option[Long] = Some(2015)
|
override def estimatedSize: Long = 2015
|
||||||
}
|
|
||||||
|
|
||||||
class DummyClass9 extends SizeEstimation {
|
|
||||||
val x: Int = 0
|
|
||||||
|
|
||||||
override def estimatedSize: Option[Long] = None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class SizeEstimatorSuite
|
class SizeEstimatorSuite
|
||||||
|
@ -231,9 +225,5 @@ class SizeEstimatorSuite
|
||||||
// DummyClass8 provides its size estimation.
|
// DummyClass8 provides its size estimation.
|
||||||
assertResult(2015)(SizeEstimator.estimate(new DummyClass8))
|
assertResult(2015)(SizeEstimator.estimate(new DummyClass8))
|
||||||
assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new DummyClass8)))
|
assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new DummyClass8)))
|
||||||
|
|
||||||
// DummyClass9 does not provide its size estimation.
|
|
||||||
assertResult(16)(SizeEstimator.estimate(new DummyClass9))
|
|
||||||
assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass9)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
|
||||||
import org.apache.spark.unsafe.Platform
|
import org.apache.spark.unsafe.Platform
|
||||||
import org.apache.spark.unsafe.map.BytesToBytesMap
|
import org.apache.spark.unsafe.map.BytesToBytesMap
|
||||||
import org.apache.spark.unsafe.memory.MemoryLocation
|
import org.apache.spark.unsafe.memory.MemoryLocation
|
||||||
import org.apache.spark.util.{SizeEstimation, Utils}
|
import org.apache.spark.util.{SizeEstimator, KnownSizeEstimation, Utils}
|
||||||
import org.apache.spark.util.collection.CompactBuffer
|
import org.apache.spark.util.collection.CompactBuffer
|
||||||
import org.apache.spark.{SparkConf, SparkEnv}
|
import org.apache.spark.{SparkConf, SparkEnv}
|
||||||
|
|
||||||
|
@ -190,7 +190,7 @@ private[execution] object HashedRelation {
|
||||||
private[joins] final class UnsafeHashedRelation(
|
private[joins] final class UnsafeHashedRelation(
|
||||||
private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]])
|
private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]])
|
||||||
extends HashedRelation
|
extends HashedRelation
|
||||||
with SizeEstimation
|
with KnownSizeEstimation
|
||||||
with Externalizable {
|
with Externalizable {
|
||||||
|
|
||||||
private[joins] def this() = this(null) // Needed for serialization
|
private[joins] def this() = this(null) // Needed for serialization
|
||||||
|
@ -217,8 +217,12 @@ private[joins] final class UnsafeHashedRelation(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def estimatedSize: Option[Long] = {
|
override def estimatedSize: Long = {
|
||||||
Option(binaryMap).map(_.getTotalMemoryConsumption)
|
if (binaryMap != null) {
|
||||||
|
binaryMap.getTotalMemoryConsumption
|
||||||
|
} else {
|
||||||
|
SizeEstimator.estimate(hashTable)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def get(key: InternalRow): Seq[InternalRow] = {
|
override def get(key: InternalRow): Seq[InternalRow] = {
|
||||||
|
|
Loading…
Reference in a new issue