[SPARK-35881][SQL] Add support for columnar execution of final query stage in AdaptiveSparkPlanExec

### What changes were proposed in this pull request?

Changes in this PR:

- `AdaptiveSparkPlanExec` has new methods `finalPlanSupportsColumnar` and `doExecuteColumnar` to support adaptive queries where the final query stage produces columnar data.
- `SessionState` now has a new set of injectable rules named `finalQueryStagePrepRules` that can be applied to the final query stage.
- `AdaptiveSparkPlanExec` can now safely be wrapped by either `RowToColumnarExec` or `ColumnarToRowExec`.

A Spark plugin can use the new rules to remove the root `ColumnarToRowExec` transition that is inserted by previous rules and at execution time can call `finalPlanSupportsColumnar` to see if the final query stage is columnar. If the plan is columnar then the plugin can safely call `doExecuteColumnar`. The adaptive plan can be wrapped in either `RowToColumnarExec` or `ColumnarToRowExec` to force a particular output format. There are fast paths in both of these operators to avoid any redundant transitions.

### Why are the changes needed?

Without this change it is necessary to use reflection to get the final physical plan to determine whether it is columnar and to execute it is a columnar plan. `AdaptiveSparkPlanExec` only provides public methods for row-based execution.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I have manually tested this patch with the RAPIDS Accelerator for Apache Spark.

Closes #33140 from andygrove/support-columnar-adaptive.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Andy Grove 2021-07-30 13:21:50 -05:00 committed by Thomas Graves
parent 895e3f5e2a
commit 0f538402fb
5 changed files with 138 additions and 68 deletions

View file

@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
* <li>(External) Catalog listeners.</li>
* <li>Columnar Rules.</li>
* <li>Adaptive Query Stage Preparation Rules.</li>
* <li>Adaptive Query Post Stage Preparation Rules.</li>
* </ul>
*
* The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for
@ -110,9 +111,12 @@ class SparkSessionExtensions {
type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
type ColumnarRuleBuilder = SparkSession => ColumnarRule
type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
type PostStageCreationRuleBuilder = SparkSession => Rule[SparkPlan]
private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
private[this] val queryStagePrepRuleBuilders = mutable.Buffer.empty[QueryStagePrepRuleBuilder]
private[this] val postStageCreationRuleBuilders =
mutable.Buffer.empty[PostStageCreationRuleBuilder]
/**
* Build the override rules for columnar execution.
@ -128,6 +132,14 @@ class SparkSessionExtensions {
queryStagePrepRuleBuilders.map(_.apply(session)).toSeq
}
/**
* Build the override rules for the final query stage preparation phase of adaptive query
* execution.
*/
private[sql] def buildPostStageCreationRules(session: SparkSession): Seq[Rule[SparkPlan]] = {
postStageCreationRuleBuilders.map(_.apply(session)).toSeq
}
/**
* Inject a rule that can override the columnar execution of an executor.
*/
@ -136,13 +148,21 @@ class SparkSessionExtensions {
}
/**
* Inject a rule that can override the the query stage preparation phase of adaptive query
* Inject a rule that can override the query stage preparation phase of adaptive query
* execution.
*/
def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit = {
queryStagePrepRuleBuilders += builder
}
/**
* Inject a rule that can override the final query stage preparation phase of adaptive query
* execution.
*/
def injectPostStageCreationRule(builder: PostStageCreationRuleBuilder): Unit = {
postStageCreationRuleBuilders += builder
}
private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
/**

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
@ -65,7 +66,9 @@ trait ColumnarToRowTransition extends UnaryExecNode
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
assert(child.supportsColumnar)
// child plan must be columnar or an adaptive plan, which could either be row-based or
// columnar, but we don't know until we execute it
assert(child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
override def output: Seq[Attribute] = child.output
@ -83,18 +86,25 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
)
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
// This avoids calling `output` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localOutput = this.output
child.executeColumnar().mapPartitionsInternal { batches =>
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
batch.rowIterator().asScala.map(toUnsafe)
}
child match {
case a: AdaptiveSparkPlanExec if !a.finalPlanSupportsColumnar() =>
// if the child plan is adaptive and resulted in rows rather than columnar data
// then we can bypass any transition
a.execute()
case _ =>
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
// This avoids calling `output` in the RDD closure, so that we don't need to include
// the entire plan (this) in the closure.
val localOutput = this.output
child.executeColumnar().mapPartitionsInternal { batches =>
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
batch.rowIterator().asScala.map(toUnsafe)
}
}
}
}
@ -419,6 +429,10 @@ trait RowToColumnarTransition extends UnaryExecNode
* would only be to reduce code.
*/
case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
// child plan must be row-based or an adaptive plan, which could either be row-based or
// columnar, but we don't know until we execute it
assert(!child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
@ -441,52 +455,60 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
)
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = this.schema
child.execute().mapPartitionsInternal { rowIterator =>
if (rowIterator.hasNext) {
new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(localSchema)
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, localSchema)
} else {
OnHeapColumnVector.allocateColumns(numRows, localSchema)
}
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
child match {
case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>
// if the child plan is adaptive and resulted in columnar data
// then we can bypass any transition
a.executeColumnar()
case _ =>
val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the
// configs.
val numRows = conf.columnBatchSize
// This avoids calling `schema` in the RDD closure, so that we don't need to include the
// entire plan (this) in the closure.
val localSchema = this.schema
child.execute().mapPartitionsInternal { rowIterator =>
if (rowIterator.hasNext) {
new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(localSchema)
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, localSchema)
} else {
OnHeapColumnVector.allocateColumns(numRows, localSchema)
}
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
cb.close()
}
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
cb.close()
}
override def hasNext: Boolean = {
rowIterator.hasNext
}
override def hasNext: Boolean = {
rowIterator.hasNext
}
override def next(): ColumnarBatch = {
cb.setNumRows(0)
vectors.foreach(_.reset())
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, vectors.toArray)
rowCount += 1
override def next(): ColumnarBatch = {
cb.setNumRows(0)
vectors.foreach(_.reset())
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, vectors.toArray)
rowCount += 1
}
cb.setNumRows(rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
}
cb.setNumRows(rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
} else {
Iterator.empty
}
}
} else {
Iterator.empty
}
}
}

View file

@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils
/**
@ -111,7 +112,7 @@ case class AdaptiveSparkPlanExec(
@transient private val postStageCreationRules = Seq(
ApplyColumnarRulesAndInsertTransitions(context.session.sessionState.columnarRules),
CollapseCodegenStages()
)
) ++ context.session.sessionState.postStageCreationRules
// The partitioning of the query output depends on the shuffle(s) in the final stage. If the
// original plan contains a repartition operator, we need to preserve the specified partitioning,
@ -187,6 +188,13 @@ case class AdaptiveSparkPlanExec(
override def doCanonicalize(): SparkPlan = inputPlan.canonicalized
// This operator reports that output is row-based but because of the adaptive nature of
// execution, we don't really know whether the output is going to row-based or columnar
// until we start running the query, so there is a finalPlanSupportsColumnar method that
// can be called at execution time to determine what the output format is.
// This operator can safely be wrapped in either RowToColumnarExec or ColumnarToRowExec.
override def supportsColumnar: Boolean = false
override def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
executedPlan.resetMetrics()
@ -314,27 +322,41 @@ case class AdaptiveSparkPlanExec(
}
override def executeCollect(): Array[InternalRow] = {
val rdd = getFinalPhysicalPlan().executeCollect()
finalPlanUpdate
rdd
withFinalPlanUpdate(_.executeCollect())
}
override def executeTake(n: Int): Array[InternalRow] = {
val rdd = getFinalPhysicalPlan().executeTake(n)
finalPlanUpdate
rdd
withFinalPlanUpdate(_.executeTake(n))
}
override def executeTail(n: Int): Array[InternalRow] = {
val rdd = getFinalPhysicalPlan().executeTail(n)
finalPlanUpdate
rdd
withFinalPlanUpdate(_.executeTail(n))
}
override def doExecute(): RDD[InternalRow] = {
val rdd = getFinalPhysicalPlan().execute()
withFinalPlanUpdate(_.execute())
}
/**
* Determine if the final query stage supports columnar execution. Calling this method
* will trigger query execution of child query stages if they have not already executed.
*
* If this method returns true then it is safe to call doExecuteColumnar to execute the
* final stage.
*/
def finalPlanSupportsColumnar(): Boolean = {
getFinalPhysicalPlan().supportsColumnar
}
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
withFinalPlanUpdate(_.executeColumnar())
}
private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = {
val plan = getFinalPhysicalPlan()
val result = fun(plan)
finalPlanUpdate
rdd
result
}
override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {

View file

@ -307,6 +307,10 @@ abstract class BaseSessionStateBuilder(
extensions.buildQueryStagePrepRules(session)
}
protected def postStageCreationRules: Seq[Rule[SparkPlan]] = {
extensions.buildPostStageCreationRules(session)
}
/**
* Create a query execution object.
*/
@ -360,7 +364,8 @@ abstract class BaseSessionStateBuilder(
createQueryExecution,
createClone,
columnarRules,
queryStagePrepRules)
queryStagePrepRules,
postStageCreationRules)
}
}

View file

@ -79,7 +79,8 @@ private[sql] class SessionState(
createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState,
val columnarRules: Seq[ColumnarRule],
val queryStagePrepRules: Seq[Rule[SparkPlan]]) {
val queryStagePrepRules: Seq[Rule[SparkPlan]],
val postStageCreationRules: Seq[Rule[SparkPlan]]) {
// The following fields are lazy to avoid creating the Hive client when creating SessionState.
lazy val catalog: SessionCatalog = catalogBuilder()