[SPARK-12719][SQL] SQL generation support for Generate

## What changes were proposed in this pull request?

This PR adds SQL generation support for `Generate` operator. It always converts `Generate` operator into `LATERAL VIEW` format as there are many limitations to put UDTF in project list.

This PR is based on https://github.com/apache/spark/pull/11658, please see the last commit to review the real changes.

Thanks dilipbiswal for his initial work! Takes over https://github.com/apache/spark/pull/11596

## How was this patch tested?

new tests in `LogicalPlanToSQLSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11696 from cloud-fan/generate.
This commit is contained in:
Wenchen Fan 2016-03-17 20:25:05 +08:00
parent 8ef3399aff
commit 1974d1d34d
2 changed files with 170 additions and 7 deletions

View file

@ -126,6 +126,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case w: Window =>
windowToSQL(w)
case g: Generate =>
generateToSQL(g)
case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"
@ -250,6 +253,42 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}
private def generateToSQL(g: Generate): String = {
val columnAliases = g.generatorOutput.map(_.sql).mkString(",")
val childSQL = if (g.child == OneRowRelation) {
// This only happens when we put UDTF in project list and there is no FROM clause. Because we
// always generate LATERAL VIEW for `Generate`, here we use a trick to put a dummy sub-query
// after FROM clause, so that we can generate a valid LATERAL VIEW SQL string.
// For example, if the original SQL is: "SELECT EXPLODE(ARRAY(1, 2))", we will convert in to
// LATERAL VIEW format, and generate:
// SELECT col FROM (SELECT 1) sub-q0 LATERAL VIEW EXPLODE(ARRAY(1, 2)) sub_q1 AS col
s"(SELECT 1) ${SQLBuilder.newSubqueryName}"
} else {
toSQL(g.child)
}
// The final SQL string for Generate contains 7 parts:
// 1. the SQL of child, can be a table or sub-query
// 2. the LATERAL VIEW keyword
// 3. an optional OUTER keyword
// 4. the SQL of generator, e.g. EXPLODE(array_col)
// 5. the table alias for output columns of generator.
// 6. the AS keyword
// 7. the column alias, can be more than one, e.g. AS key, value
// An concrete example: "tbl LATERAL VIEW EXPLODE(map_col) sub_q AS key, value", and the builder
// will put it in FROM clause later.
build(
childSQL,
"LATERAL VIEW",
if (g.outer) "OUTER" else "",
g.generator.sql,
SQLBuilder.newSubqueryName,
"AS",
columnAliases
)
}
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
@ -423,6 +462,17 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case j: Join => j.copy(
left = addSubqueryIfNeeded(j.left),
right = addSubqueryIfNeeded(j.right))
// A special case for Generate. When we put UDTF in project list, followed by WHERE, e.g.
// SELECT EXPLODE(arr) FROM tbl WHERE id > 1, the Filter operator will be under Generate
// operator and we need to add a sub-query between them, as it's not allowed to have a WHERE
// before LATERAL VIEW, e.g. "... FROM tbl WHERE id > 2 EXPLODE(arr) ..." is illegal.
case g @ Generate(_, _, _, _, _, f: Filter) =>
// Add an extra `Project` to make sure we can generate legal SQL string for sub-query,
// for example, Subquery -> Filter -> Table will generate "(tbl WHERE ...) AS name", which
// misses the SELECT part.
val proj = Project(f.output, f)
g.copy(child = addSubquery(proj))
}
}
@ -431,13 +481,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
}
private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
case _: SubqueryAlias => plan
case _: Filter => plan
case _: Join => plan
case _: LocalLimit => plan
case _: GlobalLimit => plan
case _: SQLTable => plan
case OneRowRelation => plan
case _: SubqueryAlias |
_: Filter |
_: Join |
_: LocalLimit |
_: GlobalLimit |
_: SQLTable |
_: Generate |
OneRowRelation => plan
case _ => addSubquery(plan)
}
}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
import scala.util.control.NonFatal
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SQLTestUtils
@ -45,12 +46,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write
.saveAsTable("parquet_t2")
def createArray(id: Column): Column = {
when(id % 3 === 0, lit(null)).otherwise(array('id, 'id + 1))
}
sqlContext
.range(10)
.select(
createArray('id).as("arr"),
array(array('id), createArray('id)).as("arr2"),
lit("""{"f1": "1", "f2": "2", "f3": 3}""").as("json"),
'id
)
.write
.saveAsTable("parquet_t3")
}
override protected def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS parquet_t3")
sql("DROP TABLE IF EXISTS t0")
}
@ -625,4 +642,99 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|HAVING MAX(a.KEY) > 0
""".stripMargin)
}
test("generator in project list without FROM clause") {
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3))")
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val")
}
test("generator in project list with non-referenced table") {
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) FROM t0")
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val FROM t0")
}
test("generator in project list with referenced table") {
checkHiveQl("SELECT EXPLODE(arr) FROM parquet_t3")
checkHiveQl("SELECT EXPLODE(arr) AS val FROM parquet_t3")
}
test("generator in project list with non-UDTF expressions") {
checkHiveQl("SELECT EXPLODE(arr), id FROM parquet_t3")
checkHiveQl("SELECT EXPLODE(arr) AS val, id as a FROM parquet_t3")
}
test("generator in lateral view") {
checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW EXPLODE(arr) exp AS val")
checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW OUTER EXPLODE(arr) exp AS val")
}
test("generator in lateral view with ambiguous names") {
checkHiveQl(
"""
|SELECT exp.id, parquet_t3.id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr) exp AS id
""".stripMargin)
checkHiveQl(
"""
|SELECT exp.id, parquet_t3.id
|FROM parquet_t3
|LATERAL VIEW OUTER EXPLODE(arr) exp AS id
""".stripMargin)
}
test("use JSON_TUPLE as generator") {
checkHiveQl(
"""
|SELECT c0, c1, c2
|FROM parquet_t3
|LATERAL VIEW JSON_TUPLE(json, 'f1', 'f2', 'f3') jt
""".stripMargin)
checkHiveQl(
"""
|SELECT a, b, c
|FROM parquet_t3
|LATERAL VIEW JSON_TUPLE(json, 'f1', 'f2', 'f3') jt AS a, b, c
""".stripMargin)
}
test("nested generator in lateral view") {
checkHiveQl(
"""
|SELECT val, id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
|LATERAL VIEW EXPLODE(nested_array) exp1 AS val
""".stripMargin)
checkHiveQl(
"""
|SELECT val, id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
|LATERAL VIEW OUTER EXPLODE(nested_array) exp1 AS val
""".stripMargin)
}
test("generate with other operators") {
checkHiveQl(
"""
|SELECT EXPLODE(arr) AS val, id
|FROM parquet_t3
|WHERE id > 2
|ORDER BY val, id
|LIMIT 5
""".stripMargin)
checkHiveQl(
"""
|SELECT val, id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
|LATERAL VIEW EXPLODE(nested_array) exp1 AS val
|WHERE val > 2
|ORDER BY val, id
|LIMIT 5
""".stripMargin)
}
}