[SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2213 from liancheng/spark-3320 and squashes the following commits:

45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite
f67067d [Cheng Lian] Fixed SPARK-3320
This commit is contained in:
Cheng Lian 2014-08-29 18:16:47 -07:00 committed by Michael Armbrust
parent 13901764f4
commit 32b18dd52c
3 changed files with 42 additions and 37 deletions

View file

@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan(
override def execute() = {
relation.cachedColumnBuffers.mapPartitions { iterator =>
// Find the ordinals of the requested columns. If none are requested, use the first.
val requestedColumns =
if (attributes.isEmpty) {
Seq(0)
} else {
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
}
new Iterator[Row] {
private[this] var columnBuffers: Array[ByteBuffer] = null
private[this] var columnAccessors: Seq[ColumnAccessor] = null
nextBatch()
private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
def nextBatch() = {
columnBuffers = iterator.next()
columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
}
override def next() = {
if (!columnAccessors.head.hasNext) {
nextBatch()
}
var i = 0
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
nextRow
}
override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
val requestedColumns = if (attributes.isEmpty) {
Seq(0)
} else {
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
}
iterator
.map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))
.flatMap { columnAccessors =>
val nextRow = new GenericMutableRow(columnAccessors.length)
new Iterator[Row] {
override def next() = {
var i = 0
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
nextRow
}
override def hasNext = columnAccessors.head.hasNext
}
}
}
}
}

View file

@ -151,4 +151,9 @@ object TestData {
TimestampField(new Timestamp(i))
})
timestamps.registerTempTable("timestamps")
case class IntField(i: Int)
// An RDD with 4 elements and 8 partitions
val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8)
withEmptyParts.registerTempTable("withEmptyParts")
}

View file

@ -17,14 +17,13 @@
package org.apache.spark.sql.columnar
import org.apache.spark.sql.{QueryTest, TestData}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{SQLConf, QueryTest, TestData}
class InMemoryColumnarQuerySuite extends QueryTest {
import TestData._
import TestSQLContext._
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext._
test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
@ -93,4 +92,16 @@ class InMemoryColumnarQuerySuite extends QueryTest {
sql("SELECT time FROM timestamps"),
timestamps.collect().toSeq)
}
test("SPARK-3320 regression: batched column buffer building should work with empty partitions") {
checkAnswer(
sql("SELECT * FROM withEmptyParts"),
withEmptyParts.collect().toSeq)
TestSQLContext.cacheTable("withEmptyParts")
checkAnswer(
sql("SELECT * FROM withEmptyParts"),
withEmptyParts.collect().toSeq)
}
}