[SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation

Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation in a single query plan.

Author: Michael Armbrust <michael@databricks.com>

Closes #1332 from marmbrus/doubleCache and squashes the following commits:

4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor.
b39c931 [Michael Armbrust] Allocations are kind of a side effect.
f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation
This commit is contained in:
Michael Armbrust 2014-07-12 12:13:32 -07:00 committed by Reynold Xin
parent 7e26b57615
commit 1a7d7cc85f
2 changed files with 25 additions and 12 deletions

View file

@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* of itself with globally unique expression ids.
*/
trait MultiInstanceRelation {
def newInstance: this.type
def newInstance(): this.type
}
/**

View file

@ -17,6 +17,9 @@
package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@ -26,22 +29,19 @@ import org.apache.spark.SparkConf
object InMemoryRelation {
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, child)
new InMemoryRelation(child.output, useCompression, child)()
}
private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
child: SparkPlan)
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
override def children = Seq.empty
override def references = Set.empty
override def newInstance() =
new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
lazy val cachedColumnBuffers = {
// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation(
}.cache()
cached.setName(child.toString)
// Force the materialization of the cached RDD.
cached.count()
cached
_cachedColumnBuffers = cached
}
override def children = Seq.empty
override def references = Set.empty
override def newInstance() = {
new InMemoryRelation(
output.map(_.newInstance),
useCompression,
child)(
_cachedColumnBuffers).asInstanceOf[this.type]
}
def cachedColumnBuffers = _cachedColumnBuffers
}
private[sql] case class InMemoryColumnarTableScan(