[SPARK-2650][SQL] Try to partially fix SPARK-2650 by adjusting initial buffer size and reducing memory allocation
JIRA issue: [SPARK-2650](https://issues.apache.org/jira/browse/SPARK-2650) Please refer to [comments](https://issues.apache.org/jira/browse/SPARK-2650?focusedCommentId=14084397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14084397) of SPARK-2650 for some other details. This PR adjusts the initial in-memory columnar buffer size to 1MB, same as the default value of Shark's `shark.column.partitionSize.mb` property when running in local mode. Will add Shark style partition size estimation in another PR. Also, before this PR, `NullableColumnBuilder` copies the whole buffer to add the null positions section, and then `CompressibleColumnBuilder` copies and compresses the buffer again, even if compression is disabled (`PassThrough` compression scheme is used to disable compression). In this PR the first buffer copy is eliminated to reduce memory consumption. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1769 from liancheng/spark-2650 and squashes the following commits: 88a042e [Cheng Lian] Fixed method visibility and removed dead code 001f2e5 [Cheng Lian] Try fixing SPARK-2650 by adjusting initial buffer size and reducing memory allocation
This commit is contained in:
parent
d94f5990e5
commit
d0ae3f3912
|
@ -118,7 +118,7 @@ private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
|
|||
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)
|
||||
|
||||
private[sql] object ColumnBuilder {
|
||||
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
|
||||
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
|
||||
|
||||
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
|
||||
if (orig.remaining >= size) {
|
||||
|
|
|
@ -36,9 +36,9 @@ import org.apache.spark.sql.Row
|
|||
* }}}
|
||||
*/
|
||||
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
|
||||
private var nulls: ByteBuffer = _
|
||||
protected var nulls: ByteBuffer = _
|
||||
protected var nullCount: Int = _
|
||||
private var pos: Int = _
|
||||
private var nullCount: Int = _
|
||||
|
||||
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
|
||||
nulls = ByteBuffer.allocate(1024)
|
||||
|
@ -78,4 +78,9 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
|
|||
buffer.rewind()
|
||||
buffer
|
||||
}
|
||||
|
||||
protected def buildNonNulls(): ByteBuffer = {
|
||||
nulls.limit(nulls.position()).rewind()
|
||||
super.build()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,8 +46,6 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
|
|||
|
||||
this: NativeColumnBuilder[T] with WithCompressionSchemes =>
|
||||
|
||||
import CompressionScheme._
|
||||
|
||||
var compressionEncoders: Seq[Encoder[T]] = _
|
||||
|
||||
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
|
||||
|
@ -81,28 +79,32 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
|
|||
}
|
||||
}
|
||||
|
||||
abstract override def build() = {
|
||||
val rawBuffer = super.build()
|
||||
override def build() = {
|
||||
val nonNullBuffer = buildNonNulls()
|
||||
val typeId = nonNullBuffer.getInt()
|
||||
val encoder: Encoder[T] = {
|
||||
val candidate = compressionEncoders.minBy(_.compressionRatio)
|
||||
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
|
||||
}
|
||||
|
||||
val headerSize = columnHeaderSize(rawBuffer)
|
||||
// Header = column type ID + null count + null positions
|
||||
val headerSize = 4 + 4 + nulls.limit()
|
||||
val compressedSize = if (encoder.compressedSize == 0) {
|
||||
rawBuffer.limit - headerSize
|
||||
nonNullBuffer.remaining()
|
||||
} else {
|
||||
encoder.compressedSize
|
||||
}
|
||||
|
||||
// Reserves 4 bytes for compression scheme ID
|
||||
val compressedBuffer = ByteBuffer
|
||||
// Reserves 4 bytes for compression scheme ID
|
||||
.allocate(headerSize + 4 + compressedSize)
|
||||
.order(ByteOrder.nativeOrder)
|
||||
|
||||
copyColumnHeader(rawBuffer, compressedBuffer)
|
||||
// Write the header
|
||||
.putInt(typeId)
|
||||
.putInt(nullCount)
|
||||
.put(nulls)
|
||||
|
||||
logInfo(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
|
||||
encoder.compress(rawBuffer, compressedBuffer, columnType)
|
||||
encoder.compress(nonNullBuffer, compressedBuffer, columnType)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,22 +67,6 @@ private[sql] object CompressionScheme {
|
|||
s"Unrecognized compression scheme type ID: $typeId"))
|
||||
}
|
||||
|
||||
def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
|
||||
// Writes column type ID
|
||||
to.putInt(from.getInt())
|
||||
|
||||
// Writes null count
|
||||
val nullCount = from.getInt()
|
||||
to.putInt(nullCount)
|
||||
|
||||
// Writes null positions
|
||||
var i = 0
|
||||
while (i < nullCount) {
|
||||
to.putInt(from.getInt())
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
|
||||
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
|
||||
val nullCount = header.getInt(4)
|
||||
|
|
|
@ -42,4 +42,3 @@ object TestCompressibleColumnBuilder {
|
|||
builder
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue