Avoid dynamic dispatching when unwrapping Hive data.
This is a follow up of PR #758. The `unwrapHiveData` function is now composed statically before actual rows are scanned according to the field object inspector to avoid dynamic dispatching cost. According to the same micro benchmark used in PR #758, this simple change brings slight performance boost: 2.5% for CSV table and 1% for RCFile table. ``` Optimized version: CSV: 6870 ms, RCFile: 5687 ms CSV: 6832 ms, RCFile: 5800 ms CSV: 6822 ms, RCFile: 5679 ms CSV: 6704 ms, RCFile: 5758 ms CSV: 6819 ms, RCFile: 5725 ms Original version: CSV: 7042 ms, RCFile: 5667 ms CSV: 6883 ms, RCFile: 5703 ms CSV: 7115 ms, RCFile: 5665 ms CSV: 7020 ms, RCFile: 5981 ms CSV: 6871 ms, RCFile: 5906 ms ``` Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #935 from liancheng/staticUnwrapping and squashes the following commits: c49c70c [Cheng Lian] Avoid dynamic dispatching when unwrapping Hive data.
This commit is contained in:
parent
ec8be274a7
commit
862283e9cc
|
@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
|
||||||
import org.apache.hadoop.hive.serde.serdeConstants
|
import org.apache.hadoop.hive.serde.serdeConstants
|
||||||
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
|
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
|
||||||
import org.apache.hadoop.hive.serde2.objectinspector._
|
import org.apache.hadoop.hive.serde2.objectinspector._
|
||||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
|
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
|
||||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
|
|
||||||
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
|
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
|
||||||
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
|
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
|
||||||
import org.apache.hadoop.io.Writable
|
import org.apache.hadoop.io.Writable
|
||||||
|
@ -95,29 +94,34 @@ case class HiveTableScan(
|
||||||
attributes.map { a =>
|
attributes.map { a =>
|
||||||
val ordinal = relation.partitionKeys.indexOf(a)
|
val ordinal = relation.partitionKeys.indexOf(a)
|
||||||
if (ordinal >= 0) {
|
if (ordinal >= 0) {
|
||||||
|
val dataType = relation.partitionKeys(ordinal).dataType
|
||||||
(_: Any, partitionKeys: Array[String]) => {
|
(_: Any, partitionKeys: Array[String]) => {
|
||||||
val value = partitionKeys(ordinal)
|
castFromString(partitionKeys(ordinal), dataType)
|
||||||
val dataType = relation.partitionKeys(ordinal).dataType
|
|
||||||
unwrapHiveData(castFromString(value, dataType))
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
val ref = objectInspector.getAllStructFieldRefs
|
val ref = objectInspector.getAllStructFieldRefs
|
||||||
.find(_.getFieldName == a.name)
|
.find(_.getFieldName == a.name)
|
||||||
.getOrElse(sys.error(s"Can't find attribute $a"))
|
.getOrElse(sys.error(s"Can't find attribute $a"))
|
||||||
|
val fieldObjectInspector = ref.getFieldObjectInspector
|
||||||
|
|
||||||
|
val unwrapHiveData = fieldObjectInspector match {
|
||||||
|
case _: HiveVarcharObjectInspector =>
|
||||||
|
(value: Any) => value.asInstanceOf[HiveVarchar].getValue
|
||||||
|
case _: HiveDecimalObjectInspector =>
|
||||||
|
(value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
|
||||||
|
case _ =>
|
||||||
|
identity[Any] _
|
||||||
|
}
|
||||||
|
|
||||||
(row: Any, _: Array[String]) => {
|
(row: Any, _: Array[String]) => {
|
||||||
val data = objectInspector.getStructFieldData(row, ref)
|
val data = objectInspector.getStructFieldData(row, ref)
|
||||||
unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
|
val hiveData = unwrapData(data, fieldObjectInspector)
|
||||||
|
if (hiveData != null) unwrapHiveData(hiveData) else null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def unwrapHiveData(value: Any) = value match {
|
|
||||||
case varchar: HiveVarchar => varchar.getValue
|
|
||||||
case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
|
|
||||||
case other => other
|
|
||||||
}
|
|
||||||
|
|
||||||
private def castFromString(value: String, dataType: DataType) = {
|
private def castFromString(value: String, dataType: DataType) = {
|
||||||
Cast(Literal(value), dataType).eval(null)
|
Cast(Literal(value), dataType).eval(null)
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,15 +133,14 @@ abstract class HiveComparisonTest
|
||||||
def isSorted(plan: LogicalPlan): Boolean = plan match {
|
def isSorted(plan: LogicalPlan): Boolean = plan match {
|
||||||
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
|
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
|
||||||
case PhysicalOperation(_, _, Sort(_, _)) => true
|
case PhysicalOperation(_, _, Sort(_, _)) => true
|
||||||
case _ => plan.children.iterator.map(isSorted).exists(_ == true)
|
case _ => plan.children.iterator.exists(isSorted)
|
||||||
}
|
}
|
||||||
|
|
||||||
val orderedAnswer = hiveQuery.logical match {
|
val orderedAnswer = hiveQuery.logical match {
|
||||||
// Clean out non-deterministic time schema info.
|
// Clean out non-deterministic time schema info.
|
||||||
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
|
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
|
||||||
case _: ExplainCommand => answer
|
case _: ExplainCommand => answer
|
||||||
case plan if isSorted(plan) => answer
|
case plan => if (isSorted(plan)) answer else answer.sorted
|
||||||
case _ => answer.sorted
|
|
||||||
}
|
}
|
||||||
orderedAnswer.map(cleanPaths)
|
orderedAnswer.map(cleanPaths)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue