[SPARK-15322][SQL][FOLLOW-UP] Update deprecated accumulator usage into accumulatorV2

## What changes were proposed in this pull request?

This PR corrects another case that uses deprecated `accumulableCollection` to use `listAccumulator`, which seems the previous PR missed.

Since `ArrayBuffer[InternalRow].asJava` is `java.util.List[InternalRow]`, it seems ok to replace the usage.

## How was this patch tested?

Related existing tests `InMemoryColumnarQuerySuite` and `CachedTableSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #13187 from HyukjinKwon/SPARK-15322.
This commit is contained in:
hyukjinkwon 2016-05-19 11:54:50 -07:00 committed by Andrew Or
parent faafd1e9db
commit f5065abf49

View file

@ -17,11 +17,11 @@
package org.apache.spark.sql.execution.columnar
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import org.apache.commons.lang.StringUtils
import org.apache.spark.{Accumulable, Accumulator}
import org.apache.spark.Accumulator
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.{AccumulatorContext, ListAccumulator}
private[sql] object InMemoryRelation {
@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation(
tableName: Option[String])(
@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private[sql] var _statistics: Statistics = null,
private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
extends logical.LeafNode with MultiInstanceRelation {
override def producedAttributes: AttributeSet = outputSet
private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
private[sql] val batchStats: ListAccumulator[InternalRow] =
if (_batchStats == null) {
child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
child.sqlContext.sparkContext.listAccumulator[InternalRow]
} else {
_batchStats
}
@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)
batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
}
// Statistics propagation contracts:
@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation(
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))
batchStats += stats
batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)