[SPARK-8381][SQL]reuse typeConvert when convert Seq[Row] to catalyst type

reuse-typeConvert when convert Seq[Row] to CatalystType

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #6831 from lianhuiwang/reuse-typeConvert and squashes the following commits:

1fec395 [Lianhui Wang] remove CatalystTypeConverters.convertToCatalyst
714462d [Lianhui Wang] add package[sql]
9d1fbf3 [Lianhui Wang] address JoshRosen's comments
768956f [Lianhui Wang] update scala style
4498c62 [Lianhui Wang] reuse typeConvert
This commit is contained in:
Lianhui Wang 2015-06-17 22:52:47 -07:00 committed by Josh Rosen
parent 3b6107704f
commit 9db73ec124
5 changed files with 12 additions and 22 deletions

View file

@ -335,16 +335,6 @@ object CatalystTypeConverters {
override def toScalaImpl(row: InternalRow, column: Int): Double = row.getDouble(column)
}
/**
* Converts Scala objects to catalyst rows / types. This method is slow, and for batch
* conversion you should be using converter produced by createToCatalystConverter.
* Note: This is always called after schemaFor has been called.
* This ordering is important for UDT registration.
*/
def convertToCatalyst(scalaValue: Any, dataType: DataType): Any = {
getConverterForType(dataType).toCatalyst(scalaValue)
}
/**
* Creates a converter function that will convert Scala objects to the specified Catalyst type.
* Typical use case would be converting a collection of rows that have the same schema. You will

View file

@ -258,7 +258,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
val data = PrimitiveData(1, 1, 1, 1, 1, 1, true)
val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
val dataType = schemaFor[PrimitiveData].dataType
assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData)
}
test("convert Option[Product] to catalyst") {
@ -268,7 +268,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
val dataType = schemaFor[OptionalData].dataType
val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
InternalRow(1, 1, 1, 1, 1, 1, true))
assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData)
}
test("infer schema from case class with multiple constructors") {

View file

@ -1029,10 +1029,10 @@ class DataFrame private[sql](
val elementTypes = schema.toAttributes.map { attr => (attr.dataType, attr.nullable) }
val names = schema.toAttributes.map(_.name)
val convert = CatalystTypeConverters.createToCatalystConverter(schema)
val rowFunction =
f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema)
.asInstanceOf[InternalRow]))
f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr))
Generate(generator, join = true, outer = false,
@ -1059,8 +1059,8 @@ class DataFrame private[sql](
val names = attributes.map(_.name)
def rowFunction(row: Row): TraversableOnce[InternalRow] = {
f(row(0).asInstanceOf[A]).map(o =>
InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType)))
val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
}
val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)

View file

@ -536,12 +536,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
Class.forName(className, true, Utils.getContextOrSparkClassLoader))
val extractors =
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
val methodsToConverts = extractors.zip(attributeSeq).map { case (e, attr) =>
(e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
}
iter.map { row =>
new GenericRow(
extractors.zip(attributeSeq).map { case (e, attr) =>
CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType)
}.toArray[Any]
methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) }.toArray[Any]
) : InternalRow
}
}

View file

@ -65,8 +65,8 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
protected override def doExecute(): RDD[InternalRow] = {
val converted = sideEffectResult.map(r =>
CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow])
val convert = CatalystTypeConverters.createToCatalystConverter(schema)
val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
sqlContext.sparkContext.parallelize(converted, 1)
}
}