[SPARK-16165][SQL] Fix the update logic for InMemoryTableScanExec.readBatches

## What changes were proposed in this pull request?

Currently, `readBatches` accumulator of `InMemoryTableScanExec` is updated only when `spark.sql.inMemoryColumnarStorage.partitionPruning` is true. Although this metric is used for only testing purpose, we had better have correct metric without considering SQL options.

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13870 from dongjoon-hyun/SPARK-16165.
This commit is contained in:
Dongjoon Hyun 2016-06-24 07:19:20 +08:00 committed by Cheng Lian
parent 0e4bdebece
commit 264bc63623
2 changed files with 18 additions and 3 deletions

View file

@ -147,9 +147,6 @@ private[sql] case class InMemoryTableScanExec(
logInfo(s"Skipping partition based on stats $statsString")
false
} else {
if (enableAccumulators) {
readBatches.add(1)
}
true
}
}
@ -159,6 +156,9 @@ private[sql] case class InMemoryTableScanExec(
// update SQL metrics
val withMetrics = cachedBatchesToScan.map { batch =>
if (enableAccumulators) {
readBatches.add(1)
}
numOutputRows += batch.numRows
batch
}

View file

@ -119,6 +119,21 @@ class PartitionBatchPruningSuite
}
}
// With disable IN_MEMORY_PARTITION_PRUNING option
test("disable IN_MEMORY_PARTITION_PRUNING") {
spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)
val df = sql("SELECT key FROM pruningData WHERE key = 1")
val result = df.collect().map(_(0)).toArray
assert(result.length === 1)
val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
}.head
assert(readPartitions === 5)
assert(readBatches === 10)
}
def checkBatchPruning(
query: String,
expectedReadPartitions: Int,