[SPARK-15744][SQL] Rename two TungstenAggregation*Suites and update codgen/error messages/comments
## What changes were proposed in this pull request? For consistency, this PR updates some remaining `TungstenAggregation/SortBasedAggregate` after SPARK-15728. - Update a comment in codegen in `VectorizedHashMapGenerator.scala`. - `TungstenAggregationQuerySuite` --> `HashAggregationQuerySuite` - `TungstenAggregationQueryWithControlledFallbackSuite` --> `HashAggregationQueryWithControlledFallbackSuite` - Update two error messages in `SQLQuerySuite.scala` and `AggregationQuerySuite.scala`. - Update several comments. ## How was this patch tested? Manual (Only comment changes and test suite renamings). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13487 from dongjoon-hyun/SPARK-15744.
This commit is contained in:
parent
f7288e166c
commit
b9fcfb3bd1
|
@ -30,7 +30,7 @@ import static org.apache.spark.sql.types.DataTypes.LongType;
|
|||
* This is an illustrative implementation of an append-only single-key/single value aggregate hash
|
||||
* map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates
|
||||
* (and fall back to the `BytesToBytesMap` if a given key isn't found). This can be potentially
|
||||
* 'codegened' in TungstenAggregate to speed up aggregates w/ key.
|
||||
* 'codegened' in HashAggregate to speed up aggregates w/ key.
|
||||
*
|
||||
* It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the
|
||||
* key-value pairs. The index lookups in the array rely on linear probing (with a small number of
|
||||
|
|
|
@ -82,7 +82,7 @@ object AggUtils {
|
|||
aggregateExpressions: Seq[AggregateExpression],
|
||||
resultExpressions: Seq[NamedExpression],
|
||||
child: SparkPlan): Seq[SparkPlan] = {
|
||||
// Check if we can use TungstenAggregate.
|
||||
// Check if we can use HashAggregate.
|
||||
|
||||
// 1. Create an Aggregate Operator for partial aggregations.
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ abstract class AggregationIterator(
|
|||
* - PartialMerge (for single distinct)
|
||||
* - Partial and PartialMerge (for single distinct)
|
||||
* - Final
|
||||
* - Complete (for SortBasedAggregate with functions that does not support Partial)
|
||||
* - Complete (for SortAggregate with functions that does not support Partial)
|
||||
* - Final and Complete (currently not used)
|
||||
*
|
||||
* TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression
|
||||
|
|
|
@ -458,7 +458,7 @@ case class HashAggregateExec(
|
|||
}
|
||||
|
||||
/**
|
||||
* Using the vectorized hash map in TungstenAggregate is currently supported for all primitive
|
||||
* Using the vectorized hash map in HashAggregate is currently supported for all primitive
|
||||
* data types during partial aggregation. However, we currently only enable the hash map for a
|
||||
* subset of cases that've been verified to show performance improvements on our benchmarks
|
||||
* subject to an internal conf that sets an upper limit on the maximum length of the aggregate
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
|
|||
/**
|
||||
* This is a helper class to generate an append-only vectorized hash map that can act as a 'cache'
|
||||
* for extremely fast key-value lookups while evaluating aggregates (and fall back to the
|
||||
* `BytesToBytesMap` if a given key isn't found). This is 'codegened' in TungstenAggregate to speed
|
||||
* `BytesToBytesMap` if a given key isn't found). This is 'codegened' in HashAggregate to speed
|
||||
* up aggregates w/ key.
|
||||
*
|
||||
* It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the
|
||||
|
@ -127,7 +127,7 @@ class VectorizedHashMapGenerator(
|
|||
| public $generatedClassName() {
|
||||
| batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
|
||||
| org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
|
||||
| // TODO: Possibly generate this projection in TungstenAggregate directly
|
||||
| // TODO: Possibly generate this projection in HashAggregate directly
|
||||
| aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
|
||||
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
|
||||
| for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
|
||||
|
|
|
@ -251,7 +251,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
|||
if (!hasGeneratedAgg) {
|
||||
fail(
|
||||
s"""
|
||||
|Codegen is enabled, but query $sqlText does not have TungstenAggregate in the plan.
|
||||
|Codegen is enabled, but query $sqlText does not have HashAggregate in the plan.
|
||||
|${df.queryExecution.simpleString}
|
||||
""".stripMargin)
|
||||
}
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.metric
|
||||
|
||||
|
@ -135,8 +135,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
|
|||
|
||||
test("Aggregate metrics") {
|
||||
// Assume the execution plan is
|
||||
// ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
|
||||
// -> TungstenAggregate(nodeId = 0)
|
||||
// ... -> HashAggregate(nodeId = 2) -> Exchange(nodeId = 1)
|
||||
// -> HashAggregate(nodeId = 0)
|
||||
val df = testData2.groupBy().count() // 2 partitions
|
||||
testSparkPlanMetrics(df, 1, Map(
|
||||
2L -> ("HashAggregate", Map("number of output rows" -> 2L)),
|
||||
|
|
|
@ -869,10 +869,10 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
|
|||
DateType, TimestampType,
|
||||
ArrayType(IntegerType), MapType(StringType, LongType), struct,
|
||||
new UDT.MyDenseVectorUDT())
|
||||
// Right now, we will use SortBasedAggregate to handle UDAFs.
|
||||
// UnsafeRow.mutableFieldTypes.asScala.toSeq will trigger SortBasedAggregate to use
|
||||
// Right now, we will use SortAggregate to handle UDAFs.
|
||||
// UnsafeRow.mutableFieldTypes.asScala.toSeq will trigger SortAggregate to use
|
||||
// UnsafeRow as the aggregation buffer. While, dataTypes will trigger
|
||||
// SortBasedAggregate to use a safe row as the aggregation buffer.
|
||||
// SortAggregate to use a safe row as the aggregation buffer.
|
||||
Seq(dataTypes, UnsafeRow.mutableFieldTypes.asScala.toSeq).foreach { dataTypes =>
|
||||
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
|
||||
StructField(s"col$index", dataType, nullable = true)
|
||||
|
@ -992,10 +992,10 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
|
|||
}
|
||||
|
||||
|
||||
class TungstenAggregationQuerySuite extends AggregationQuerySuite
|
||||
class HashAggregationQuerySuite extends AggregationQuerySuite
|
||||
|
||||
|
||||
class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
|
||||
class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
|
||||
|
||||
override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
|
||||
Seq(0, 10).foreach { maxColumnarHashMapColumns =>
|
||||
|
@ -1013,7 +1013,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue
|
|||
case Some(errorMessage) =>
|
||||
val newErrorMessage =
|
||||
s"""
|
||||
|The following aggregation query failed when using TungstenAggregate with
|
||||
|The following aggregation query failed when using HashAggregate with
|
||||
|controlled fallback (it falls back to bytes to bytes map once it has processed
|
||||
|${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has
|
||||
|processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}
|
||||
|
|
Loading…
Reference in a new issue