[SPARK-24166][SQL] InMemoryTableScanExec should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from https://github.com/apache/spark/pull/21190 , to make it easier to backport.

`InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there.

## How was this patch tested?

it's tested in #21190

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21223 from cloud-fan/minor1.
This commit is contained in:
Wenchen Fan 2018-05-03 19:56:30 +08:00
parent 417ad92502
commit 991b526992

View file

@ -78,10 +78,12 @@ case class InMemoryTableScanExec(
private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i)))
private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
private def createAndDecompressColumn(
cachedColumnarBatch: CachedBatch,
offHeapColumnVectorEnabled: Boolean): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
val taskContext = Option(TaskContext.get())
val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) {
val columnVectors = if (!offHeapColumnVectorEnabled || taskContext.isEmpty) {
OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
} else {
OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
@ -101,10 +103,13 @@ case class InMemoryTableScanExec(
private lazy val inputRDD: RDD[InternalRow] = {
val buffers = filteredCachedBatches()
val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
if (supportsBatch) {
// HACK ALERT: This is actually an RDD[ColumnarBatch].
// We're taking advantage of Scala's type erasure here to pass these batches along.
buffers.map(createAndDecompressColumn).asInstanceOf[RDD[InternalRow]]
buffers
.map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
.asInstanceOf[RDD[InternalRow]]
} else {
val numOutputRows = longMetric("numOutputRows")