[SPARK-22827][SQL][FOLLOW-UP] Throw SparkOutOfMemoryError in HashAggregateExec, too.

## What changes were proposed in this pull request?

This is a follow-up pr of #20014 which introduced `SparkOutOfMemoryError` to avoid killing the entire executor when an `OutOfMemoryError` is thrown.
We should throw `SparkOutOfMemoryError` in `HashAggregateExec`, too.

## How was this patch tested?

Existing tests.

Closes #22969 from ueshin/issues/SPARK-22827/oome.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Takuya UESHIN 2018-11-08 03:51:55 -08:00 committed by Dongjoon Hyun
parent a3004d084c
commit 0d7396f3af
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.aggregate
import org.apache.spark.TaskContext
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
@ -762,6 +762,8 @@ case class HashAggregateExec(
("true", "true", "", "")
}
val oomeClassName = classOf[SparkOutOfMemoryError].getName
val findOrInsertRegularHashMap: String =
s"""
|// generate grouping key
@ -787,7 +789,7 @@ case class HashAggregateExec(
| $unsafeRowKeys, ${hashEval.value});
| if ($unsafeRowBuffer == null) {
| // failed to allocate the first page
| throw new OutOfMemoryError("No enough memory for aggregation");
| throw new $oomeClassName("No enough memory for aggregation");
| }
|}
""".stripMargin