From 42cf4a2a5efa4c63a54f75b30d7644c336ffa83c Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 14 May 2015 00:14:59 +0800 Subject: [PATCH] [SPARK-6734] [SQL] Add UDTF.close support in Generate Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao Closes #5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close (cherry picked from commit 0da254fb2903c01e059fa7d0dc81df5740312b35) Signed-off-by: Cheng Lian --- .../sql/catalyst/expressions/generators.scala | 6 +++ .../apache/spark/sql/execution/Generate.scala | 38 +++++++++++++----- .../org/apache/spark/sql/hive/hiveUdfs.scala | 18 +++++++-- sql/hive/src/test/resources/TestUDTF.jar | Bin 0 -> 1328 bytes ...l Views-0-ac5c96224a534f07b49462ad76620678 | 2 + ... SELECT-0-517f834fef35b896ec64399f42b2a151 | 2 + .../sql/hive/execution/HiveQuerySuite.scala | 21 ++++++++++ 7 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 sql/hive/src/test/resources/TestUDTF.jar create mode 100644 sql/hive/src/test/resources/golden/Test UDTF.close in Lateral Views-0-ac5c96224a534f07b49462ad76620678 create mode 100644 sql/hive/src/test/resources/golden/Test UDTF.close in SELECT-0-517f834fef35b896ec64399f42b2a151 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 9a6cb048af..747a47bdde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -56,6 +56,12 @@ abstract class Generator extends Expression { /** Should be implemented by child classes to perform specific Generators. */ override def eval(input: Row): TraversableOnce[Row] + + /** + * Notifies that there are no more rows to process, clean up code, and additional + * rows can be made here. + */ + def terminate(): TraversableOnce[Row] = Nil } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index 08d9079335..dd02c1f457 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -21,6 +21,18 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ +/** + * For lazy computing, be sure the generator.terminate() called in the very last + * TODO reusing the CompletionIterator? + */ +private[execution] sealed case class LazyIterator(func: () => TraversableOnce[Row]) + extends Iterator[Row] { + + lazy val results = func().toIterator + override def hasNext: Boolean = results.hasNext + override def next(): Row = results.next() +} + /** * :: DeveloperApi :: * Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the @@ -47,27 +59,33 @@ case class Generate( val boundGenerator = BindReferences.bindReference(generator, child.output) protected override def doExecute(): RDD[Row] = { + // boundGenerator.terminate() should be triggered after all of the rows in the partition if (join) { child.execute().mapPartitions { iter => - val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null)) - // Used to produce rows with no matches when outer = true. - val outerProjection = - newProjection(child.output ++ nullValues, child.output) - - val joinProjection = newProjection(output, output) + val generatorNullRow = Row.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null)) val joinedRow = new JoinedRow - iter.flatMap {row => + iter.flatMap { row => + // we should always set the left (child output) + joinedRow.withLeft(row) val outputRows = boundGenerator.eval(row) if (outer && outputRows.isEmpty) { - outerProjection(row) :: Nil + joinedRow.withRight(generatorNullRow) :: Nil } else { - outputRows.map(or => joinProjection(joinedRow(row, or))) + outputRows.map(or => joinedRow.withRight(or)) } + } ++ LazyIterator(() => boundGenerator.terminate()).map { row => + // we leave the left side as the last element of its child output + // keep it the same as Hive does + joinedRow.withRight(row) } } } else { - child.execute().mapPartitions(iter => iter.flatMap(row => boundGenerator.eval(row))) + child.execute().mapPartitions { iter => + iter.flatMap(row => boundGenerator.eval(row)) ++ + LazyIterator(() => boundGenerator.terminate()) + } } } } + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index fd0b6f0585..bc6b3a2d58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -483,7 +483,11 @@ private[hive] case class HiveGenericUdtf( extends Generator with HiveInspectors { @transient - protected lazy val function: GenericUDTF = funcWrapper.createFunction() + protected lazy val function: GenericUDTF = { + val fun: GenericUDTF = funcWrapper.createFunction() + fun.setCollector(collector) + fun + } @transient protected lazy val inputInspectors = children.map(toInspector) @@ -494,6 +498,9 @@ private[hive] case class HiveGenericUdtf( @transient protected lazy val udtInput = new Array[AnyRef](children.length) + @transient + protected lazy val collector = new UDTFCollector + lazy val elementTypes = outputInspector.getAllStructFieldRefs.map { field => (inspectorToDataType(field.getFieldObjectInspector), true) } @@ -502,8 +509,7 @@ private[hive] case class HiveGenericUdtf( outputInspector // Make sure initialized. val inputProjection = new InterpretedProjection(children) - val collector = new UDTFCollector - function.setCollector(collector) + function.process(wrap(inputProjection(input), inputInspectors, udtInput)) collector.collectRows() } @@ -525,6 +531,12 @@ private[hive] case class HiveGenericUdtf( } } + override def terminate(): TraversableOnce[Row] = { + outputInspector // Make sure initialized. + function.close() + collector.collectRows() + } + override def toString: String = { s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" } diff --git a/sql/hive/src/test/resources/TestUDTF.jar b/sql/hive/src/test/resources/TestUDTF.jar new file mode 100644 index 0000000000000000000000000000000000000000..514f2d5d26fd358ad5647e0e75edb8ce77b69e30 GIT binary patch literal 1328 zcmWIWW@Zs#;Nak3xSr7K#()Gk8CV#6T|*poJ^kGD|D9rBU}gyLX6FE@V1g9+ zP&U6PT|coPF*zeuzqlZ=C|kd{Fh@Tlvn*9VwIVgSv?Mb>Pv1Q?FSRH$In*V@%{jj` zuf#|%IVZ8WcxlK>KjA=;e|ow+J<2CuHJwts5Oyj+z|m2lF^e}kXqra-9v}VG4QH-I z3e+&hyZ7o`e8K!9;8^kPO%YMTk5r1!#h&{#_wB!5f8VM%C>{yDve5geZOQcoVzyjW zs{AWm4sj`IiIjvs-uI-|v|~|xYaioEfnb9b2_6TT+q@2c)9qham^b@gU-(}S?-_u{zB$yU{&~h%g<;U5!%kr`i_rG&( z1=D`Ixp}xt-d0_A?wzF4>zUi0YDp~H8)9C0?2p5AdFR)~+a_PDoF*OZbpMg&?Q^f9 zpRKzP`*6vT1$oc^%$hb^HO-n;rgQS&h#!-Fd7l5syZ%O3Q#+5>wT-+Fck$>h6k2#X zcLjSfwFen_YXhGm z^=`M1UD-x;p7>=k`xz!GhN&ekd9&Q?Yf0SWJ!;Oue2X$)dsxC;)*?8E^F7HIkJ4tmi@{)@8^X5ZB1KR^!8JazLNx?@F#6Qh=oqF?F^3mU?`(NA31m#>^_H^wjyAyleIZG~N zN3cs-gj`tdxtaav#WjMwmn?hLF23F(t;XrL$Fhh;j8kUuBZfwg5A03sEB^S1^O$#B zQ*zkSJH>jz!pVO5lRCTRbb9WZtQMslA1J9HGa0HDESi2z@s87s ze>SVae&s**JNU+?Q1kX**;A{PL(Yr;aEf<6t9WJW3-OqVd-SV#-d sql("USE default")