diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 6175456c58..620e8de83a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -335,16 +335,6 @@ object CatalystTypeConverters { override def toScalaImpl(row: InternalRow, column: Int): Double = row.getDouble(column) } - /** - * Converts Scala objects to catalyst rows / types. This method is slow, and for batch - * conversion you should be using converter produced by createToCatalystConverter. - * Note: This is always called after schemaFor has been called. - * This ordering is important for UDT registration. - */ - def convertToCatalyst(scalaValue: Any, dataType: DataType): Any = { - getConverterForType(dataType).toCatalyst(scalaValue) - } - /** * Creates a converter function that will convert Scala objects to the specified Catalyst type. * Typical use case would be converting a collection of rows that have the same schema. You will diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index c2d739b529..b4b00f5584 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -258,7 +258,7 @@ class ScalaReflectionSuite extends SparkFunSuite { val data = PrimitiveData(1, 1, 1, 1, 1, 1, true) val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true) val dataType = schemaFor[PrimitiveData].dataType - assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData) + assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData) } test("convert Option[Product] to catalyst") { @@ -268,7 +268,7 @@ class ScalaReflectionSuite extends SparkFunSuite { val dataType = schemaFor[OptionalData].dataType val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true, InternalRow(1, 1, 1, 1, 1, 1, true)) - assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData) + assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData) } test("infer schema from case class with multiple constructors") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 444916bbad..466258e76f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1029,10 +1029,10 @@ class DataFrame private[sql]( val elementTypes = schema.toAttributes.map { attr => (attr.dataType, attr.nullable) } val names = schema.toAttributes.map(_.name) + val convert = CatalystTypeConverters.createToCatalystConverter(schema) val rowFunction = - f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema) - .asInstanceOf[InternalRow])) + f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr)) Generate(generator, join = true, outer = false, @@ -1059,8 +1059,8 @@ class DataFrame private[sql]( val names = attributes.map(_.name) def rowFunction(row: Row): TraversableOnce[InternalRow] = { - f(row(0).asInstanceOf[A]).map(o => - InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType))) + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o))) } val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 9d1f89d6d7..6b605f7130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -536,12 +536,12 @@ class SQLContext(@transient val sparkContext: SparkContext) Class.forName(className, true, Utils.getContextOrSparkClassLoader)) val extractors = localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod) - + val methodsToConverts = extractors.zip(attributeSeq).map { case (e, attr) => + (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) + } iter.map { row => new GenericRow( - extractors.zip(attributeSeq).map { case (e, attr) => - CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType) - }.toArray[Any] + methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) }.toArray[Any] ) : InternalRow } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 653792ea2e..c9dfcea5d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -65,8 +65,8 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { - val converted = sideEffectResult.map(r => - CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow]) + val convert = CatalystTypeConverters.createToCatalystConverter(schema) + val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow]) sqlContext.sparkContext.parallelize(converted, 1) } }