From 1974d1d34d42c91730eaf45f7958cfab4827a14c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 17 Mar 2016 20:25:05 +0800 Subject: [PATCH] [SPARK-12719][SQL] SQL generation support for Generate ## What changes were proposed in this pull request? This PR adds SQL generation support for `Generate` operator. It always converts `Generate` operator into `LATERAL VIEW` format as there are many limitations to put UDTF in project list. This PR is based on https://github.com/apache/spark/pull/11658, please see the last commit to review the real changes. Thanks dilipbiswal for his initial work! Takes over https://github.com/apache/spark/pull/11596 ## How was this patch tested? new tests in `LogicalPlanToSQLSuite` Author: Wenchen Fan Closes #11696 from cloud-fan/generate. --- .../apache/spark/sql/hive/SQLBuilder.scala | 65 ++++++++-- .../sql/hive/LogicalPlanToSQLSuite.scala | 112 ++++++++++++++++++ 2 files changed, 170 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index cd417ce3cc..05dfad239a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -126,6 +126,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case w: Window => windowToSQL(w) + case g: Generate => + generateToSQL(g) + case Limit(limitExpr, child) => s"${toSQL(child)} LIMIT ${limitExpr.sql}" @@ -250,6 +253,42 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + private def generateToSQL(g: Generate): String = { + val columnAliases = g.generatorOutput.map(_.sql).mkString(",") + + val childSQL = if (g.child == OneRowRelation) { + // This only happens when we put UDTF in project list and there is no FROM clause. Because we + // always generate LATERAL VIEW for `Generate`, here we use a trick to put a dummy sub-query + // after FROM clause, so that we can generate a valid LATERAL VIEW SQL string. + // For example, if the original SQL is: "SELECT EXPLODE(ARRAY(1, 2))", we will convert in to + // LATERAL VIEW format, and generate: + // SELECT col FROM (SELECT 1) sub-q0 LATERAL VIEW EXPLODE(ARRAY(1, 2)) sub_q1 AS col + s"(SELECT 1) ${SQLBuilder.newSubqueryName}" + } else { + toSQL(g.child) + } + + // The final SQL string for Generate contains 7 parts: + // 1. the SQL of child, can be a table or sub-query + // 2. the LATERAL VIEW keyword + // 3. an optional OUTER keyword + // 4. the SQL of generator, e.g. EXPLODE(array_col) + // 5. the table alias for output columns of generator. + // 6. the AS keyword + // 7. the column alias, can be more than one, e.g. AS key, value + // An concrete example: "tbl LATERAL VIEW EXPLODE(map_col) sub_q AS key, value", and the builder + // will put it in FROM clause later. + build( + childSQL, + "LATERAL VIEW", + if (g.outer) "OUTER" else "", + g.generator.sql, + SQLBuilder.newSubqueryName, + "AS", + columnAliases + ) + } + private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = output1.size == output2.size && output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) @@ -423,6 +462,17 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case j: Join => j.copy( left = addSubqueryIfNeeded(j.left), right = addSubqueryIfNeeded(j.right)) + + // A special case for Generate. When we put UDTF in project list, followed by WHERE, e.g. + // SELECT EXPLODE(arr) FROM tbl WHERE id > 1, the Filter operator will be under Generate + // operator and we need to add a sub-query between them, as it's not allowed to have a WHERE + // before LATERAL VIEW, e.g. "... FROM tbl WHERE id > 2 EXPLODE(arr) ..." is illegal. + case g @ Generate(_, _, _, _, _, f: Filter) => + // Add an extra `Project` to make sure we can generate legal SQL string for sub-query, + // for example, Subquery -> Filter -> Table will generate "(tbl WHERE ...) AS name", which + // misses the SELECT part. + val proj = Project(f.output, f) + g.copy(child = addSubquery(proj)) } } @@ -431,13 +481,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match { - case _: SubqueryAlias => plan - case _: Filter => plan - case _: Join => plan - case _: LocalLimit => plan - case _: GlobalLimit => plan - case _: SQLTable => plan - case OneRowRelation => plan + case _: SubqueryAlias | + _: Filter | + _: Join | + _: LocalLimit | + _: GlobalLimit | + _: SQLTable | + _: Generate | + OneRowRelation => plan case _ => addSubquery(plan) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index ca46c229f1..f3cb6f8511 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import scala.util.control.NonFatal +import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestUtils @@ -45,12 +46,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write .saveAsTable("parquet_t2") + + def createArray(id: Column): Column = { + when(id % 3 === 0, lit(null)).otherwise(array('id, 'id + 1)) + } + + sqlContext + .range(10) + .select( + createArray('id).as("arr"), + array(array('id), createArray('id)).as("arr2"), + lit("""{"f1": "1", "f2": "2", "f3": 3}""").as("json"), + 'id + ) + .write + .saveAsTable("parquet_t3") } override protected def afterAll(): Unit = { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") } @@ -625,4 +642,99 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { |HAVING MAX(a.KEY) > 0 """.stripMargin) } + + test("generator in project list without FROM clause") { + checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3))") + checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val") + } + + test("generator in project list with non-referenced table") { + checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) FROM t0") + checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val FROM t0") + } + + test("generator in project list with referenced table") { + checkHiveQl("SELECT EXPLODE(arr) FROM parquet_t3") + checkHiveQl("SELECT EXPLODE(arr) AS val FROM parquet_t3") + } + + test("generator in project list with non-UDTF expressions") { + checkHiveQl("SELECT EXPLODE(arr), id FROM parquet_t3") + checkHiveQl("SELECT EXPLODE(arr) AS val, id as a FROM parquet_t3") + } + + test("generator in lateral view") { + checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW EXPLODE(arr) exp AS val") + checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW OUTER EXPLODE(arr) exp AS val") + } + + test("generator in lateral view with ambiguous names") { + checkHiveQl( + """ + |SELECT exp.id, parquet_t3.id + |FROM parquet_t3 + |LATERAL VIEW EXPLODE(arr) exp AS id + """.stripMargin) + checkHiveQl( + """ + |SELECT exp.id, parquet_t3.id + |FROM parquet_t3 + |LATERAL VIEW OUTER EXPLODE(arr) exp AS id + """.stripMargin) + } + + test("use JSON_TUPLE as generator") { + checkHiveQl( + """ + |SELECT c0, c1, c2 + |FROM parquet_t3 + |LATERAL VIEW JSON_TUPLE(json, 'f1', 'f2', 'f3') jt + """.stripMargin) + checkHiveQl( + """ + |SELECT a, b, c + |FROM parquet_t3 + |LATERAL VIEW JSON_TUPLE(json, 'f1', 'f2', 'f3') jt AS a, b, c + """.stripMargin) + } + + test("nested generator in lateral view") { + checkHiveQl( + """ + |SELECT val, id + |FROM parquet_t3 + |LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array + |LATERAL VIEW EXPLODE(nested_array) exp1 AS val + """.stripMargin) + + checkHiveQl( + """ + |SELECT val, id + |FROM parquet_t3 + |LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array + |LATERAL VIEW OUTER EXPLODE(nested_array) exp1 AS val + """.stripMargin) + } + + test("generate with other operators") { + checkHiveQl( + """ + |SELECT EXPLODE(arr) AS val, id + |FROM parquet_t3 + |WHERE id > 2 + |ORDER BY val, id + |LIMIT 5 + """.stripMargin) + + checkHiveQl( + """ + |SELECT val, id + |FROM parquet_t3 + |LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array + |LATERAL VIEW EXPLODE(nested_array) exp1 AS val + |WHERE val > 2 + |ORDER BY val, id + |LIMIT 5 + """.stripMargin) + } }