[SPARK-13215] [SQL] remove fallback in codegen

Since we remove the configuration for codegen, we are heavily reply on codegen (also TungstenAggregate require the generated MutableProjection to update UnsafeRow), should remove the fallback, which could make user confusing, see the discussion in SPARK-13116.

Author: Davies Liu <davies@databricks.com>

Closes #11097 from davies/remove_fallback.
This commit is contained in:
Davies Liu 2016-02-05 15:07:43 -08:00 committed by Davies Liu
parent 0bb5b73387
commit 875f507929
3 changed files with 8 additions and 66 deletions

View file

@ -200,47 +200,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
}
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
GeneratePredicate.generate(expression, inputSchema)
}
protected def newOrdering(
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
GenerateOrdering.generate(order, inputSchema)
}
/**

View file

@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedMutableProjection, MutableRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, ImperativeAggregate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
@ -361,13 +361,7 @@ private[sql] case class ScalaUDAF(
val inputAttributes = childrenSchema.toAttributes
log.debug(
s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
try {
GenerateMutableProjection.generate(children, inputAttributes)()
} catch {
case e: Exception =>
log.error("Failed to generate mutable projection, fallback to interpreted", e)
new InterpretedMutableProjection(children, inputAttributes)
}
GenerateMutableProjection.generate(children, inputAttributes)()
}
private[this] lazy val inputToScalaConverters: Any => Any =

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.local
import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@ -96,33 +94,13 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
GenerateMutableProjection.generate(expressions, inputSchema)
}
protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
GeneratePredicate.generate(expression, inputSchema)
}
}