[SPARK-34382][SQL] Support LATERAL subqueries

### What changes were proposed in this pull request?
This PR adds support for lateral subqueries. A lateral subquery is a subquery preceded by the `LATERAL` keyword in the FROM clause of a query that can reference columns in the preceding FROM items. For example:
```sql
SELECT * FROM t1, LATERAL (SELECT * FROM t2 WHERE t1.a = t2.c)
```
A new subquery expression`LateralSubquery` is used to represent a lateral subquery. It is similar to `ScalarSubquery` but can return multiple rows and columns. A new logical unary node `LateralJoin` is used to represent a lateral join.

Here is the analyzed plan for the above query:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a], Inner
   :  +- Project [c, d]
   :     +- Filter (outer(a) = c)
   :        +- Relation [c, d]
   +- Relation [a, b]
```

Similar to a correlated subquery, a lateral subquery can be viewed as a dependent (nested loop) join where the evaluation of the right subtree depends on the current value of the left subtree.  The same technique to decorrelate a subquery is used to decorrelate a lateral join:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a && a = c], Inner  // pull up correlated predicates as join conditions
   :  +- Project [c, d]
   :     +- Relation [c, d]
   +- Relation [a, b]
```
Then the lateral join can be rewritten into a normal join:
```scala
Join Inner (a = c)
:- Relation [a, b]
+- Relation [c, d]
```

#### Follow-ups:
1. Similar to rewriting correlated scalar subqueries, rewriting lateral joins is also subject to the COUNT bug (See SPARK-15370 for more details). This is **not** handled in the current PR as it requires a sizeable amount of refactoring. It will be addressed in a subsequent PR (SPARK-35551).
2. Currently Spark does use outer query references to resolve star expressions in subqueries. This is not lateral subquery specific and can be handled in a separate PR (SPARK-35618)

### Why are the changes needed?
To support an ANSI SQL feature.

### Does this PR introduce _any_ user-facing change?
Yes. It allows users to use lateral subqueries in the FROM clause of a query.

### How was this patch tested?
- Parser test: `PlanParserSuite.scala`
- Analyzer test: `ResolveSubquerySuite.scala`
- Optimizer test: `PullupCorrelatedPredicatesSuite.scala`
- SQL test: `join-lateral.sql`, `postgreSQL/join.sql`

Closes #32303 from allisonwang-db/spark-34382-lateral.

Lead-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
allisonwang-db 2021-06-09 17:08:32 +00:00 committed by Wenchen Fan
parent 519be238be
commit f49bf1a072
27 changed files with 1441 additions and 132 deletions

View file

@ -318,7 +318,7 @@ Below is a list of all the keywords in Spark SQL.
|JOIN|reserved|strict-non-reserved|reserved| |JOIN|reserved|strict-non-reserved|reserved|
|KEYS|non-reserved|non-reserved|non-reserved| |KEYS|non-reserved|non-reserved|non-reserved|
|LAST|non-reserved|non-reserved|non-reserved| |LAST|non-reserved|non-reserved|non-reserved|
|LATERAL|non-reserved|non-reserved|reserved| |LATERAL|reserved|strict-non-reserved|reserved|
|LAZY|non-reserved|non-reserved|non-reserved| |LAZY|non-reserved|non-reserved|non-reserved|
|LEADING|reserved|non-reserved|reserved| |LEADING|reserved|non-reserved|reserved|
|LEFT|reserved|strict-non-reserved|reserved| |LEFT|reserved|strict-non-reserved|reserved|

View file

@ -643,12 +643,12 @@ setQuantifier
; ;
relation relation
: relationPrimary joinRelation* : LATERAL? relationPrimary joinRelation*
; ;
joinRelation joinRelation
: (joinType) JOIN right=relationPrimary joinCriteria? : (joinType) JOIN LATERAL? right=relationPrimary joinCriteria?
| NATURAL joinType JOIN right=relationPrimary | NATURAL joinType JOIN LATERAL? right=relationPrimary
; ;
joinType joinType
@ -1122,7 +1122,6 @@ ansiNonReserved
| ITEMS | ITEMS
| KEYS | KEYS
| LAST | LAST
| LATERAL
| LAZY | LAZY
| LIKE | LIKE
| LIMIT | LIMIT
@ -1252,6 +1251,7 @@ strictNonReserved
| INNER | INNER
| INTERSECT | INTERSECT
| JOIN | JOIN
| LATERAL
| LEFT | LEFT
| NATURAL | NATURAL
| ON | ON
@ -1373,7 +1373,6 @@ nonReserved
| ITEMS | ITEMS
| KEYS | KEYS
| LAST | LAST
| LATERAL
| LAZY | LAZY
| LEADING | LEADING
| LIKE | LIKE

View file

@ -2278,6 +2278,14 @@ class Analyzer(override val catalogManager: CatalogManager)
* Note: CTEs are handled in CTESubstitution. * Note: CTEs are handled in CTESubstitution.
*/ */
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper { object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
/**
* Wrap attributes in the expression with [[OuterReference]]s.
*/
private def wrapOuterReference[E <: Expression](e: E): E = {
e.transform { case a: Attribute => OuterReference(a) }.asInstanceOf[E]
}
/** /**
* Resolve the correlated expressions in a subquery by using the an outer plans' references. All * Resolve the correlated expressions in a subquery by using the an outer plans' references. All
* resolved outer references are wrapped in an [[OuterReference]] * resolved outer references are wrapped in an [[OuterReference]]
@ -2290,7 +2298,7 @@ class Analyzer(override val catalogManager: CatalogManager)
withPosition(u) { withPosition(u) {
try { try {
outer.resolve(nameParts, resolver) match { outer.resolve(nameParts, resolver) match {
case Some(outerAttr) => OuterReference(outerAttr) case Some(outerAttr) => wrapOuterReference(outerAttr)
case None => u case None => u
} }
} catch { } catch {
@ -2349,8 +2357,7 @@ class Analyzer(override val catalogManager: CatalogManager)
* outer plan to get evaluated. * outer plan to get evaluated.
*/ */
private def resolveSubQueries(plan: LogicalPlan, plans: Seq[LogicalPlan]): LogicalPlan = { private def resolveSubQueries(plan: LogicalPlan, plans: Seq[LogicalPlan]): LogicalPlan = {
plan.transformAllExpressionsWithPruning(_.containsAnyPattern(SCALAR_SUBQUERY, plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION), ruleId) {
EXISTS_SUBQUERY, IN_SUBQUERY), ruleId) {
case s @ ScalarSubquery(sub, _, exprId, _) if !sub.resolved => case s @ ScalarSubquery(sub, _, exprId, _) if !sub.resolved =>
resolveSubQuery(s, plans)(ScalarSubquery(_, _, exprId)) resolveSubQuery(s, plans)(ScalarSubquery(_, _, exprId))
case e @ Exists(sub, _, exprId, _) if !sub.resolved => case e @ Exists(sub, _, exprId, _) if !sub.resolved =>
@ -2361,6 +2368,8 @@ class Analyzer(override val catalogManager: CatalogManager)
ListQuery(plan, exprs, exprId, plan.output) ListQuery(plan, exprs, exprId, plan.output)
}) })
InSubquery(values, expr.asInstanceOf[ListQuery]) InSubquery(values, expr.asInstanceOf[ListQuery])
case s @ LateralSubquery(sub, _, exprId, _) if !sub.resolved =>
resolveSubQuery(s, plans)(LateralSubquery(_, _, exprId))
} }
} }
@ -2368,11 +2377,13 @@ class Analyzer(override val catalogManager: CatalogManager)
* Resolve and rewrite all subqueries in an operator tree.. * Resolve and rewrite all subqueries in an operator tree..
*/ */
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(SCALAR_SUBQUERY, EXISTS_SUBQUERY, IN_SUBQUERY), ruleId) { _.containsPattern(PLAN_EXPRESSION), ruleId) {
// In case of HAVING (a filter after an aggregate) we use both the aggregate and // In case of HAVING (a filter after an aggregate) we use both the aggregate and
// its child for resolution. // its child for resolution.
case f @ Filter(_, a: Aggregate) if f.childrenResolved => case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
resolveSubQueries(f, Seq(a, a.child)) resolveSubQueries(f, Seq(a, a.child))
case j: LateralJoin if j.left.resolved =>
resolveSubQueries(j, j.children)
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries. // Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved => case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children) resolveSubQueries(q, q.children)

View file

@ -777,6 +777,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
} }
} }
case _: LateralSubquery =>
assert(plan.isInstanceOf[LateralJoin])
case inSubqueryOrExistsSubquery => case inSubqueryOrExistsSubquery =>
plan match { plan match {
case _: Filter | _: SupportsSubquery | _: Join => // Ok case _: Filter | _: SupportsSubquery | _: Join => // Ok
@ -788,7 +791,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// Validate to make sure the correlations appearing in the query are valid and // Validate to make sure the correlations appearing in the query are valid and
// allowed by spark. // allowed by spark.
checkCorrelationsInSubquery(expr.plan) checkCorrelationsInSubquery(expr.plan, isLateral = plan.isInstanceOf[LateralJoin])
} }
/** /**
@ -827,7 +830,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
* Validates to make sure the outer references appearing inside the subquery * Validates to make sure the outer references appearing inside the subquery
* are allowed. * are allowed.
*/ */
private def checkCorrelationsInSubquery(sub: LogicalPlan): Unit = { private def checkCorrelationsInSubquery(sub: LogicalPlan, isLateral: Boolean = false): Unit = {
// Validate that correlated aggregate expression do not contain a mixture // Validate that correlated aggregate expression do not contain a mixture
// of outer and local references. // of outer and local references.
def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = { def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
@ -849,12 +852,21 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
} }
} }
// Check whether the logical plan node can host outer references.
// A `Project` can host outer references if it is inside a lateral subquery.
// Otherwise, only Filter can only outer references.
def canHostOuter(plan: LogicalPlan): Boolean = plan match {
case _: Filter => true
case _: Project => isLateral
case _ => false
}
// Make sure a plan's expressions do not contain : // Make sure a plan's expressions do not contain :
// 1. Aggregate expressions that have mixture of outer and local references. // 1. Aggregate expressions that have mixture of outer and local references.
// 2. Expressions containing outer references on plan nodes other than Filter. // 2. Expressions containing outer references on plan nodes other than allowed operators.
def failOnInvalidOuterReference(p: LogicalPlan): Unit = { def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr) p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { if (!canHostOuter(p) && p.expressions.exists(containsOuter)) {
failAnalysis( failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " + "Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p") s"clauses:\n$p")
@ -988,6 +1000,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case r: RepartitionByExpression => case r: RepartitionByExpression =>
failOnInvalidOuterReference(r) failOnInvalidOuterReference(r)
case l: LateralJoin =>
failOnInvalidOuterReference(l)
// Category 3: // Category 3:
// Filter is one of the two operators allowed to host correlated expressions. // Filter is one of the two operators allowed to host correlated expressions.
// The other operator is Join. Filter can be anywhere in a correlated subquery. // The other operator is Join. Filter can be anywhere in a correlated subquery.

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeSet, NamedExpression, OuterReference, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.TreePattern._
@ -41,11 +41,14 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
object DeduplicateRelations extends Rule[LogicalPlan] { object DeduplicateRelations extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = { override def apply(plan: LogicalPlan): LogicalPlan = {
renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning( renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning(
_.containsAnyPattern(JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) { _.containsAnyPattern(JOIN, LATERAL_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) {
case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if !p.childrenResolved => p
// To resolve duplicate expression IDs for Join. // To resolve duplicate expression IDs for Join.
case j @ Join(left, right, _, _, _) if !j.duplicateResolved => case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right)) j.copy(right = dedupRight(left, right))
// Resolve duplicate output for LateralJoin.
case j @ LateralJoin(left, right, _, _) if right.resolved && !j.duplicateResolved =>
j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
// intersect/except will be rewritten to join at the beginning of optimizer. Here we need to // intersect/except will be rewritten to join at the beginning of optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid self-join later. // deduplicate the right side plan, so that we won't produce an invalid self-join later.
case i @ Intersect(left, right, _) if !i.duplicateResolved => case i @ Intersect(left, right, _) if !i.duplicateResolved =>
@ -182,6 +185,16 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
if findAliases(projectList).intersect(conflictingAttributes).nonEmpty => if findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(projectList = newAliases(projectList)))) Seq((oldVersion, oldVersion.copy(projectList = newAliases(projectList))))
// Handle projects that create conflicting outer references.
case oldVersion @ Project(projectList, _)
if findOuterReferences(projectList).intersect(conflictingAttributes).nonEmpty =>
// Add alias to conflicting outer references.
val aliasedProjectList = projectList.map {
case o @ OuterReference(a) if conflictingAttributes.contains(a) => Alias(o, a.name)()
case other => other
}
Seq((oldVersion, oldVersion.copy(projectList = aliasedProjectList)))
// We don't need to search child plan recursively if the projectList of a Project // We don't need to search child plan recursively if the projectList of a Project
// is only composed of Alias and doesn't contain any conflicting attributes. // is only composed of Alias and doesn't contain any conflicting attributes.
// Because, even if the child plan has some conflicting attributes, the attributes // Because, even if the child plan has some conflicting attributes, the attributes
@ -273,4 +286,8 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
private def findAliases(projectList: Seq[NamedExpression]): AttributeSet = { private def findAliases(projectList: Seq[NamedExpression]): AttributeSet = {
AttributeSet(projectList.collect { case a: Alias => a.toAttribute }) AttributeSet(projectList.collect { case a: Alias => a.toAttribute })
} }
private def findOuterReferences(projectList: Seq[NamedExpression]): AttributeSet = {
AttributeSet(projectList.collect { case o: OuterReference => o.toAttribute })
}
} }

View file

@ -285,6 +285,39 @@ object ScalarSubquery {
} }
} }
/**
* A subquery that can return multiple rows and columns. This should be rewritten as a join
* with the outer query during the optimization phase.
*
* Note: `exprId` is used to have a unique name in explain string output.
*/
case class LateralSubquery(
plan: LogicalPlan,
outerAttrs: Seq[Expression] = Seq.empty,
exprId: ExprId = NamedExpression.newExprId,
joinCond: Seq[Expression] = Seq.empty)
extends SubqueryExpression(plan, outerAttrs, exprId, joinCond) with Unevaluable {
override def dataType: DataType = plan.output.toStructType
override def nullable: Boolean = true
override def withNewPlan(plan: LogicalPlan): LateralSubquery = copy(plan = plan)
override def toString: String = s"lateral-subquery#${exprId.id} $conditionString"
override lazy val canonicalized: Expression = {
LateralSubquery(
plan.canonicalized,
outerAttrs.map(_.canonicalized),
ExprId(0),
joinCond.map(_.canonicalized))
}
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): LateralSubquery =
copy(
outerAttrs = newChildren.take(outerAttrs.size),
joinCond = newChildren.drop(outerAttrs.size))
final override def nodePatternsInternal: Seq[TreePattern] = Seq(LATERAL_SUBQUERY)
}
/** /**
* A [[ListQuery]] expression defines the query which we want to search in an IN subquery * A [[ListQuery]] expression defines the query which we want to search in an IN subquery
* expression. It should and can only be used in conjunction with an IN expression. * expression. It should and can only be used in conjunction with an IN expression.

View file

@ -217,36 +217,42 @@ object DecorrelateInnerQuery extends PredicateHelper {
def rewriteDomainJoins( def rewriteDomainJoins(
outerPlan: LogicalPlan, outerPlan: LogicalPlan,
innerPlan: LogicalPlan, innerPlan: LogicalPlan,
conditions: Seq[Expression]): LogicalPlan = { conditions: Seq[Expression]): LogicalPlan = innerPlan match {
innerPlan transform { case d @ DomainJoin(domainAttrs, child) =>
case d @ DomainJoin(domainAttrs, child) => val domainAttrMap = buildDomainAttrMap(conditions, domainAttrs)
val domainAttrMap = buildDomainAttrMap(conditions, domainAttrs) // We should only rewrite a domain join when all corresponding outer plan attributes
// We should only rewrite a domain join when all corresponding outer plan attributes // can be found from the join condition.
// can be found from the join condition. if (domainAttrMap.size == domainAttrs.size) {
if (domainAttrMap.size == domainAttrs.size) { val groupingExprs = domainAttrs.map(domainAttrMap)
val groupingExprs = domainAttrs.map(domainAttrMap) val aggregateExprs = groupingExprs.zip(domainAttrs).map {
val aggregateExprs = groupingExprs.zip(domainAttrs).map { // Rebuild the aliases.
// Rebuild the aliases. case (inputAttr, outputAttr) => Alias(inputAttr, outputAttr.name)(outputAttr.exprId)
case (inputAttr, outputAttr) => Alias(inputAttr, outputAttr.name)(outputAttr.exprId)
}
// Construct a domain with the outer query plan.
// DomainJoin [a', b'] => Aggregate [a, b] [a AS a', b AS b']
// +- Relation [a, b]
val domain = Aggregate(groupingExprs, aggregateExprs, outerPlan)
child match {
// A special optimization for OneRowRelation.
// TODO: add a more general rule to optimize join with OneRowRelation.
case _: OneRowRelation => domain
// Construct a domain join.
// Join Inner
// :- Inner Query
// +- Domain
case _ => Join(child, domain, Inner, None, JoinHint.NONE)
}
} else {
throw QueryExecutionErrors.cannotRewriteDomainJoinWithConditionsError(conditions, d)
} }
} // Construct a domain with the outer query plan.
// DomainJoin [a', b'] => Aggregate [a, b] [a AS a', b AS b']
// +- Relation [a, b]
val domain = Aggregate(groupingExprs, aggregateExprs, outerPlan)
child match {
// A special optimization for OneRowRelation.
// TODO: add a more general rule to optimize join with OneRowRelation.
case _: OneRowRelation => domain
// Construct a domain join.
// Join Inner
// :- Inner Query
// +- Domain
case _ =>
// The decorrelation framework adds domain joins by traversing down the plan tree
// recursively until it reaches a node that is not correlated with the outer query.
// So the child node of a domain join shouldn't contain another domain join.
assert(child.find(_.isInstanceOf[DomainJoin]).isEmpty,
s"Child of a domain join shouldn't contain another domain join.\n$child")
Join(child, domain, Inner, None, JoinHint.NONE)
}
} else {
throw QueryExecutionErrors.cannotRewriteDomainJoinWithConditionsError(conditions, d)
}
case p: LogicalPlan =>
p.mapChildren(rewriteDomainJoins(outerPlan, _, conditions))
} }
def apply( def apply(
@ -470,7 +476,9 @@ object DecorrelateInnerQuery extends PredicateHelper {
case u: UnaryNode => case u: UnaryNode =>
val outerReferences = collectOuterReferences(u.expressions) val outerReferences = collectOuterReferences(u.expressions)
assert(outerReferences.isEmpty, s"Correlated column is not allowed in $u") assert(outerReferences.isEmpty, s"Correlated column is not allowed in $u")
decorrelate(u.child, parentOuterReferences, aggregated) val (newChild, joinCond, outerReferenceMap) =
decorrelate(u.child, parentOuterReferences, aggregated)
(u.withNewChildren(newChild :: Nil), joinCond, outerReferenceMap)
case o => case o =>
throw QueryExecutionErrors.decorrelateInnerQueryThroughPlanUnsupportedError(o) throw QueryExecutionErrors.decorrelateInnerQueryThroughPlanUnsupportedError(o)

View file

@ -112,6 +112,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
SimplifyCasts, SimplifyCasts,
SimplifyCaseConversionExpressions, SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery, RewriteCorrelatedScalarSubquery,
RewriteLateralSubquery,
EliminateSerialization, EliminateSerialization,
RemoveRedundantAliases, RemoveRedundantAliases,
RemoveRedundantAggregates, RemoveRedundantAggregates,

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY, FILTER, IN_SUBQUERY, LIST_SUBQUERY, SCALAR_SUBQUERY} import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY, FILTER, IN_SUBQUERY, LATERAL_JOIN, LIST_SUBQUERY, PLAN_EXPRESSION, SCALAR_SUBQUERY}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
@ -304,8 +304,7 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
} }
} }
plan.transformExpressionsWithPruning(_.containsAnyPattern( plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
SCALAR_SUBQUERY, EXISTS_SUBQUERY, LIST_SUBQUERY)) {
case ScalarSubquery(sub, children, exprId, conditions) if children.nonEmpty => case ScalarSubquery(sub, children, exprId, conditions) if children.nonEmpty =>
val (newPlan, newCond) = decorrelate(sub, outerPlans) val (newPlan, newCond) = decorrelate(sub, outerPlans)
ScalarSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions)) ScalarSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions))
@ -315,6 +314,9 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
case ListQuery(sub, children, exprId, childOutputs, conditions) if children.nonEmpty => case ListQuery(sub, children, exprId, childOutputs, conditions) if children.nonEmpty =>
val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans) val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
ListQuery(newPlan, children, exprId, childOutputs, getJoinCondition(newCond, conditions)) ListQuery(newPlan, children, exprId, childOutputs, getJoinCondition(newCond, conditions))
case LateralSubquery(sub, children, exprId, conditions) if children.nonEmpty =>
val (newPlan, newCond) = decorrelate(sub, outerPlans)
LateralSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions))
} }
} }
@ -322,9 +324,19 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
* Pull up the correlated predicates and rewrite all subqueries in an operator tree.. * Pull up the correlated predicates and rewrite all subqueries in an operator tree..
*/ */
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsAnyPattern(SCALAR_SUBQUERY, EXISTS_SUBQUERY, LIST_SUBQUERY)) { _.containsPattern(PLAN_EXPRESSION)) {
case f @ Filter(_, a: Aggregate) => case f @ Filter(_, a: Aggregate) =>
rewriteSubQueries(f, Seq(a, a.child)) rewriteSubQueries(f, Seq(a, a.child))
case j: LateralJoin =>
val newPlan = rewriteSubQueries(j, j.children)
// Since a lateral join's output depends on its left child output and its lateral subquery's
// plan output, we need to trim the domain attributes added to the subquery's plan output
// to preserve the original output of the join.
if (!j.sameOutput(newPlan)) {
Project(j.output, newPlan)
} else {
newPlan
}
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries. // Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode => case q: UnaryNode =>
rewriteSubQueries(q, q.children) rewriteSubQueries(q, q.children)
@ -677,3 +689,17 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe
} }
} }
} }
/**
* This rule rewrites [[LateralSubquery]] expressions into joins.
*/
object RewriteLateralSubquery extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsPattern(LATERAL_JOIN)) {
case LateralJoin(left, LateralSubquery(sub, _, _, joinCond), joinType, condition) =>
// TODO(SPARK-35551): handle the COUNT bug
val newRight = DecorrelateInnerQuery.rewriteDomainJoins(left, sub, joinCond)
val newCond = (condition ++ joinCond).reduceOption(And)
Join(left, newRight, joinType, newCond, JoinHint.NONE)
}
}

View file

@ -871,7 +871,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) { override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) => val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
val right = plan(relation.relationPrimary) val right = plan(relation.relationPrimary)
val join = right.optionalMap(left)(Join(_, _, Inner, None, JoinHint.NONE)) val join = right.optionalMap(left) { (left, right) =>
if (relation.LATERAL != null) {
LateralJoin(left, LateralSubquery(right), Inner, None)
} else {
Join(left, right, Inner, None, JoinHint.NONE)
}
}
withJoinRelations(join, relation) withJoinRelations(join, relation)
} }
if (ctx.pivotClause() != null) { if (ctx.pivotClause() != null) {
@ -1127,12 +1133,18 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
// Resolve the join type and join condition // Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match { val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null => case Some(c) if c.USING != null =>
if (join.LATERAL != null) {
throw QueryParsingErrors.lateralJoinWithUsingJoinUnsupportedError(ctx)
}
(UsingJoin(baseJoinType, visitIdentifierList(c.identifierList)), None) (UsingJoin(baseJoinType, visitIdentifierList(c.identifierList)), None)
case Some(c) if c.booleanExpression != null => case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression))) (baseJoinType, Option(expression(c.booleanExpression)))
case Some(c) => case Some(c) =>
throw QueryParsingErrors.joinCriteriaUnimplementedError(c, ctx) throw QueryParsingErrors.joinCriteriaUnimplementedError(c, ctx)
case None if join.NATURAL != null => case None if join.NATURAL != null =>
if (join.LATERAL != null) {
throw QueryParsingErrors.lateralJoinWithNaturalJoinUnsupportedError(ctx)
}
if (baseJoinType == Cross) { if (baseJoinType == Cross) {
throw QueryParsingErrors.naturalCrossJoinUnsupportedError(ctx) throw QueryParsingErrors.naturalCrossJoinUnsupportedError(ctx)
} }
@ -1140,7 +1152,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
case None => case None =>
(baseJoinType, None) (baseJoinType, None)
} }
Join(left, plan(join.right), joinType, condition, JoinHint.NONE) if (join.LATERAL != null) {
if (!Seq(Inner, Cross, LeftOuter).contains(joinType)) {
throw QueryParsingErrors.unsupportedLateralJoinTypeError(ctx, joinType.toString)
}
LateralJoin(left, LateralSubquery(plan(join.right)), joinType, condition)
} else {
Join(left, plan(join.right), joinType, condition, JoinHint.NONE)
}
} }
} }
} }

View file

@ -45,11 +45,20 @@ object PlanHelper {
case e: AggregateExpression case e: AggregateExpression
if !(plan.isInstanceOf[Aggregate] || if !(plan.isInstanceOf[Aggregate] ||
plan.isInstanceOf[Window] || plan.isInstanceOf[Window] ||
plan.isInstanceOf[CollectMetrics]) => e plan.isInstanceOf[CollectMetrics] ||
onlyInLateralSubquery(plan)) => e
case e: Generator case e: Generator
if !plan.isInstanceOf[Generate] => e if !plan.isInstanceOf[Generate] => e
} }
} }
invalidExpressions invalidExpressions
} }
private def onlyInLateralSubquery(plan: LogicalPlan): Boolean = {
lazy val noAggInJoinCond = {
val join = plan.asInstanceOf[LateralJoin]
!(join.condition ++ join.right.joinCond).exists(AggregateExpression.containsAggregate)
}
plan.isInstanceOf[LateralJoin] && noAggInJoinCond
}
} }

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, TypeCoercion, TypeCoercionBase} import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
@ -1419,3 +1419,60 @@ case class DomainJoin(domainAttrs: Seq[Attribute], child: LogicalPlan) extends U
override protected def withNewChildInternal(newChild: LogicalPlan): DomainJoin = override protected def withNewChildInternal(newChild: LogicalPlan): DomainJoin =
copy(child = newChild) copy(child = newChild)
} }
/**
* A logical plan for lateral join.
*/
case class LateralJoin(
left: LogicalPlan,
right: LateralSubquery,
joinType: JoinType,
condition: Option[Expression]) extends UnaryNode {
require(Seq(Inner, LeftOuter, Cross).contains(joinType),
s"Unsupported lateral join type $joinType")
override def child: LogicalPlan = left
override def output: Seq[Attribute] = {
joinType match {
case LeftOuter => left.output ++ right.plan.output.map(_.withNullability(true))
case _ => left.output ++ right.plan.output
}
}
private[this] lazy val childAttributes = AttributeSeq(left.output ++ right.plan.output)
private[this] lazy val childMetadataAttributes =
AttributeSeq(left.metadataOutput ++ right.plan.metadataOutput)
/**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from
* both the left plan and the lateral subquery's plan.
*/
override def resolveChildren(
nameParts: Seq[String],
resolver: Resolver): Option[NamedExpression] = {
childAttributes.resolve(nameParts, resolver)
.orElse(childMetadataAttributes.resolve(nameParts, resolver))
}
override def childrenResolved: Boolean = left.resolved && right.resolved
def duplicateResolved: Boolean = left.outputSet.intersect(right.plan.outputSet).isEmpty
override lazy val resolved: Boolean = {
childrenResolved &&
expressions.forall(_.resolved) &&
duplicateResolved &&
condition.forall(_.dataType == BooleanType)
}
override def producedAttributes: AttributeSet = AttributeSet(right.plan.output)
final override val nodePatterns: Seq[TreePattern] = Seq(LATERAL_JOIN)
override protected def withNewChildInternal(newChild: LogicalPlan): LateralJoin = {
copy(left = newChild)
}
}

View file

@ -57,6 +57,7 @@ object TreePattern extends Enumeration {
val JSON_TO_STRUCT: Value = Value val JSON_TO_STRUCT: Value = Value
val LAMBDA_FUNCTION: Value = Value val LAMBDA_FUNCTION: Value = Value
val LAMBDA_VARIABLE: Value = Value val LAMBDA_VARIABLE: Value = Value
val LATERAL_SUBQUERY: Value = Value
val LIKE_FAMLIY: Value = Value val LIKE_FAMLIY: Value = Value
val LIST_SUBQUERY: Value = Value val LIST_SUBQUERY: Value = Value
val LITERAL: Value = Value val LITERAL: Value = Value
@ -95,6 +96,7 @@ object TreePattern extends Enumeration {
val FILTER: Value = Value val FILTER: Value = Value
val INNER_LIKE_JOIN: Value = Value val INNER_LIKE_JOIN: Value = Value
val JOIN: Value = Value val JOIN: Value = Value
val LATERAL_JOIN: Value = Value
val LEFT_SEMI_OR_ANTI_JOIN: Value = Value val LEFT_SEMI_OR_ANTI_JOIN: Value = Value
val LIMIT: Value = Value val LIMIT: Value = Value
val LOCAL_RELATION: Value = Value val LOCAL_RELATION: Value = Value

View file

@ -1143,7 +1143,7 @@ object QueryExecutionErrors {
def cannotRewriteDomainJoinWithConditionsError( def cannotRewriteDomainJoinWithConditionsError(
conditions: Seq[Expression], d: DomainJoin): Throwable = { conditions: Seq[Expression], d: DomainJoin): Throwable = {
new UnsupportedOperationException( new IllegalStateException(
s"Unable to rewrite domain join with conditions: $conditions\n$d") s"Unable to rewrite domain join with conditions: $conditions\n$d")
} }

View file

@ -100,6 +100,18 @@ object QueryParsingErrors {
new ParseException("LATERAL cannot be used together with PIVOT in FROM clause", ctx) new ParseException("LATERAL cannot be used together with PIVOT in FROM clause", ctx)
} }
def lateralJoinWithNaturalJoinUnsupportedError(ctx: ParserRuleContext): Throwable = {
new ParseException("LATERAL join with NATURAL join is not supported", ctx)
}
def lateralJoinWithUsingJoinUnsupportedError(ctx: ParserRuleContext): Throwable = {
new ParseException("LATERAL join with USING join is not supported", ctx)
}
def unsupportedLateralJoinTypeError(ctx: ParserRuleContext, joinType: String): Throwable = {
new ParseException(s"Unsupported LATERAL join type $joinType", ctx)
}
def repetitiveWindowDefinitionError(name: String, ctx: WindowClauseContext): Throwable = { def repetitiveWindowDefinitionError(name: String, ctx: WindowClauseContext): Throwable = {
new ParseException(s"The definition of window '$name' is repetitive", ctx) new ParseException(s"The definition of window '$name' is repetitive", ctx)
} }

View file

@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery} import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
/** /**
@ -31,9 +32,20 @@ class ResolveSubquerySuite extends AnalysisTest {
val a = 'a.int val a = 'a.int
val b = 'b.int val b = 'b.int
val c = 'c.int val c = 'c.int
val t1 = LocalRelation(a) val x = 'x.struct(a)
val t2 = LocalRelation(b) val y = 'y.struct(a)
val t0 = OneRowRelation()
val t1 = LocalRelation(a, b)
val t2 = LocalRelation(b, c)
val t3 = LocalRelation(c) val t3 = LocalRelation(c)
val t4 = LocalRelation(x, y)
private def lateralJoin(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType = Inner,
condition: Option[Expression] = None): LateralJoin =
LateralJoin(left, LateralSubquery(right), joinType, condition)
test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") { test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") {
val expr = Filter( val expr = Filter(
@ -53,4 +65,116 @@ class ResolveSubquerySuite extends AnalysisTest {
JoinHint.NONE) JoinHint.NONE)
assertAnalysisSuccess(expr) assertAnalysisSuccess(expr)
} }
test("deduplicate lateral subquery") {
val plan = lateralJoin(t1, t0.select('a))
// The subquery's output OuterReference(a#0) conflicts with the left child output
// attribute a#0. So an alias should be added to deduplicate the subquery's outputs.
val expected = LateralJoin(
t1,
LateralSubquery(Project(Seq(OuterReference(a).as(a.name)), t0), Seq(a)),
Inner,
None)
checkAnalysis(plan, expected)
}
test("lateral join with ambiguous join conditions") {
val plan = lateralJoin(t1, t0.select('b), condition = Some('b === 1))
assertAnalysisError(plan, "Reference 'b' is ambiguous, could be: b, b." :: Nil)
}
test("prefer resolving lateral subquery attributes from the inner query") {
val plan = lateralJoin(t1, t2.select('a, 'b, 'c))
val expected = LateralJoin(
t1,
LateralSubquery(Project(Seq(OuterReference(a).as(a.name), b, c), t2), Seq(a)),
Inner, None)
checkAnalysis(plan, expected)
}
test("qualified column names in lateral subquery") {
val t1b = b.withQualifier(Seq("t1"))
val t2b = b.withQualifier(Seq("t2"))
checkAnalysis(
lateralJoin(t1.as("t1"), t0.select($"t1.b")),
LateralJoin(
t1,
LateralSubquery(Project(Seq(OuterReference(t1b).as(b.name)), t0), Seq(t1b)),
Inner, None)
)
checkAnalysis(
lateralJoin(t1.as("t1"), t2.as("t2").select($"t1.b", $"t2.b")),
LateralJoin(
t1,
LateralSubquery(Project(Seq(OuterReference(t1b).as(b.name), t2b), t2.as("t2")), Seq(t1b)),
Inner, None)
)
}
test("resolve nested lateral subqueries") {
// SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT a, b, c FROM t2), LATERAL (SELECT b, c))
checkAnalysis(
lateralJoin(t1, lateralJoin(t2.select('a, 'b, 'c), t0.select('b, 'c))),
LateralJoin(
t1,
LateralSubquery(
LateralJoin(
Project(Seq(OuterReference(a).as(a.name), b, c), t2),
LateralSubquery(
Project(Seq(OuterReference(b).as(b.name), OuterReference(c).as(c.name)), t0),
Seq(b, c)),
Inner, None),
Seq(a)),
Inner, None)
)
// SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT a, b, c))
// TODO: support accessing columns from outer outer query.
assertAnalysisError(
lateralJoin(t1, lateralJoin(t2, t0.select('a, 'b, 'c))),
Seq("cannot resolve 'a' given input columns: []"))
}
test("lateral subquery with unresolvable attributes") {
// SELECT * FROM t1, LATERAL (SELECT a, c)
assertAnalysisError(
lateralJoin(t1, t0.select('a, 'c)),
Seq("cannot resolve 'c' given input columns: []")
)
// SELECT * FROM t1, LATERAL (SELECT a, b, c, d FROM t2)
assertAnalysisError(
lateralJoin(t1, t2.select('a, 'b, 'c, 'd)),
Seq("cannot resolve 'd' given input columns: [b, c]")
)
// SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT t1.a))
assertAnalysisError(
lateralJoin(t1, lateralJoin(t2, t0.select($"t1.a"))),
Seq("cannot resolve 't1.a' given input columns: []")
)
// SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT a, b))
assertAnalysisError(
lateralJoin(t1, lateralJoin(t2, t0.select('a, 'b))),
Seq("cannot resolve 'a' given input columns: []")
)
}
test("lateral subquery with struct type") {
val xa = GetStructField(OuterReference(x), 0, Some("a")).as(a.name)
val ya = GetStructField(OuterReference(y), 0, Some("a")).as(a.name)
checkAnalysis(
lateralJoin(t4, t0.select($"x.a", $"y.a")),
LateralJoin(t4, LateralSubquery(Project(Seq(xa, ya), t0), Seq(x, y)), Inner, None)
)
// Analyzer will try to resolve struct first before subquery alias.
assertAnalysisError(
lateralJoin(t1.as("x"), t4.select($"x.a", $"x.b")),
Seq("No such struct field b in a")
)
}
test("lateral join with unsupported expressions") {
val plan = lateralJoin(t1, t0.select(('a + 'b).as("c")),
condition = Some(sum('a) === sum('c)))
assertAnalysisError(plan, Seq("Invalid expressions: [sum(a), sum(c)]"))
}
} }

View file

@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, UpdateTable} import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LateralJoin, LocalRelation, LogicalPlan, MergeIntoTable, UpdateTable}
import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.rules.RuleExecutor
class PullupCorrelatedPredicatesSuite extends PlanTest { class PullupCorrelatedPredicatesSuite extends PlanTest {
@ -99,6 +99,18 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
comparePlans(optimized, doubleOptimized, false) comparePlans(optimized, doubleOptimized, false)
} }
test("PullupCorrelatedPredicates lateral join idempotency check") {
val right =
testRelation2
.where('b === 'd && 'd === 1)
.select('c)
val left = testRelation
val lateralJoin = LateralJoin(left, LateralSubquery(right), Inner, Some('a === 'c)).analyze
val optimized = Optimize.execute(lateralJoin)
val doubleOptimized = Optimize.execute(optimized)
comparePlans(optimized, doubleOptimized)
}
test("PullupCorrelatedPredicates should handle deletes") { test("PullupCorrelatedPredicates should handle deletes") {
val subPlan = testRelation2.where('a === 'c).select('c) val subPlan = testRelation2.where('a === 'c).select('c)
val cond = InSubquery(Seq('a), ListQuery(subPlan)) val cond = InSubquery(Seq('a), ListQuery(subPlan))

View file

@ -472,21 +472,31 @@ class PlanParserSuite extends AnalysisTest {
s"select * from t $sql u using(a, b)", s"select * from t $sql u using(a, b)",
table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), None).select(star())) table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), None).select(star()))
} }
val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin) val testLateralJoin = (sql: String, jt: JoinType) => {
assertEqual(
s"select * from t $sql lateral (select * from u) uu",
LateralJoin(
table("t"),
LateralSubquery(table("u").select(star()).as("uu")),
jt, None).select(star()))
}
val testAllExceptLateral = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin,
testUsingJoin)
val testAll = testAllExceptLateral :+ testLateralJoin
val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin) val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin)
def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = { def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = {
tests.foreach(_(sql, jt)) tests.foreach(_(sql, jt))
} }
test("cross join", Cross, Seq(testUnconditionalJoin)) test("cross join", Cross, Seq(testUnconditionalJoin, testLateralJoin))
test(",", Inner, Seq(testUnconditionalJoin)) test(",", Inner, Seq(testUnconditionalJoin, testLateralJoin))
test("join", Inner, testAll) test("join", Inner, testAll)
test("inner join", Inner, testAll) test("inner join", Inner, testAll)
test("left join", LeftOuter, testAll) test("left join", LeftOuter, testAll)
test("left outer join", LeftOuter, testAll) test("left outer join", LeftOuter, testAll)
test("right join", RightOuter, testAll) test("right join", RightOuter, testAllExceptLateral)
test("right outer join", RightOuter, testAll) test("right outer join", RightOuter, testAllExceptLateral)
test("full join", FullOuter, testAll) test("full join", FullOuter, testAllExceptLateral)
test("full outer join", FullOuter, testAll) test("full outer join", FullOuter, testAllExceptLateral)
test("left semi join", LeftSemi, testExistence) test("left semi join", LeftSemi, testExistence)
test("semi join", LeftSemi, testExistence) test("semi join", LeftSemi, testExistence)
test("left anti join", LeftAnti, testExistence) test("left anti join", LeftAnti, testExistence)
@ -540,6 +550,26 @@ class PlanParserSuite extends AnalysisTest {
.join(table("t3")) .join(table("t3"))
.join(table("t2"), Inner, Option(Symbol("t1.col1") === Symbol("t2.col2"))) .join(table("t2"), Inner, Option(Symbol("t1.col1") === Symbol("t2.col2")))
.select(star())) .select(star()))
// Test lateral join with join conditions
assertEqual(
s"select * from t join lateral (select * from u) uu on true",
LateralJoin(
table("t"),
LateralSubquery(table("u").select(star()).as("uu")),
Inner, Option(true)).select(star()))
// Test multiple lateral joins
assertEqual(
"select * from a, lateral (select * from b) bb, lateral (select * from c) cc",
LateralJoin(
LateralJoin(
table("a"),
LateralSubquery(table("b").select(star()).as("bb")),
Inner, None),
LateralSubquery(table("c").select(star()).as("cc")),
Inner, None).select(star())
)
} }
test("sampled relations") { test("sampled relations") {

View file

@ -73,14 +73,20 @@ trait PlanTestBase extends PredicateHelper with SQLHelper with SQLConfHelper { s
plan transformAllExpressions { plan transformAllExpressions {
case s: ScalarSubquery => case s: ScalarSubquery =>
s.copy(plan = normalizeExprIds(s.plan), exprId = ExprId(0)) s.copy(plan = normalizeExprIds(s.plan), exprId = ExprId(0))
case s: LateralSubquery =>
s.copy(plan = normalizeExprIds(s.plan), exprId = ExprId(0))
case e: Exists => case e: Exists =>
e.copy(exprId = ExprId(0)) e.copy(exprId = ExprId(0))
case l: ListQuery => case l: ListQuery =>
l.copy(exprId = ExprId(0)) l.copy(exprId = ExprId(0))
case a: AttributeReference => case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0)) AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
case OuterReference(a: AttributeReference) =>
OuterReference(AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0)))
case a: Alias => case a: Alias =>
Alias(a.child, a.name)(exprId = ExprId(0)) Alias(a.child, a.name)(exprId = ExprId(0))
case OuterReference(a: Alias) =>
OuterReference(Alias(a.child, a.name)(exprId = ExprId(0)))
case ae: AggregateExpression => case ae: AggregateExpression =>
ae.copy(resultId = ExprId(0)) ae.copy(resultId = ExprId(0))
case lv: NamedLambdaVariable => case lv: NamedLambdaVariable =>

View file

@ -0,0 +1,102 @@
-- Test cases for lateral join
CREATE VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2);
CREATE VIEW t2(c1, c2) AS VALUES (0, 2), (0, 3);
-- lateral join with single column select
SELECT * FROM t1, LATERAL (SELECT c1);
SELECT * FROM t1, LATERAL (SELECT c1 FROM t2);
SELECT * FROM t1, LATERAL (SELECT t1.c1 FROM t2);
SELECT * FROM t1, LATERAL (SELECT t1.c1 + t2.c1 FROM t2);
-- lateral join with star expansion
SELECT * FROM t1, LATERAL (SELECT *);
SELECT * FROM t1, LATERAL (SELECT * FROM t2);
-- TODO(SPARK-35618): resolve star expressions in subquery
-- SELECT * FROM t1, LATERAL (SELECT t1.*);
-- SELECT * FROM t1, LATERAL (SELECT t1.*, t2.* FROM t2);
-- lateral join with different join types
SELECT * FROM t1 JOIN LATERAL (SELECT c1 + c2 AS c3) ON c2 = c3;
SELECT * FROM t1 LEFT JOIN LATERAL (SELECT c1 + c2 AS c3) ON c2 = c3;
SELECT * FROM t1 CROSS JOIN LATERAL (SELECT c1 + c2 AS c3);
SELECT * FROM t1 NATURAL JOIN LATERAL (SELECT c1 + c2 AS c2);
SELECT * FROM t1 JOIN LATERAL (SELECT c1 + c2 AS c2) USING (c2);
-- lateral join without outer column references
SELECT * FROM LATERAL (SELECT * FROM t1);
SELECT * FROM t1, LATERAL (SELECT * FROM t2);
SELECT * FROM LATERAL (SELECT * FROM t1), LATERAL (SELECT * FROM t2);
SELECT * FROM LATERAL (SELECT * FROM t1) JOIN LATERAL (SELECT * FROM t2);
-- lateral join with subquery alias
SELECT a, b FROM t1, LATERAL (SELECT c1, c2) s(a, b);
-- lateral join with foldable outer query references
SELECT * FROM (SELECT 1 AS c1, 2 AS c2), LATERAL (SELECT c1, c2);
-- lateral join with correlated equality predicates
SELECT * FROM t1, LATERAL (SELECT c2 FROM t2 WHERE t1.c1 = t2.c1);
-- lateral join with correlated non-equality predicates
SELECT * FROM t1, LATERAL (SELECT c2 FROM t2 WHERE t1.c2 < t2.c2);
-- lateral join can reference preceding FROM clause items
SELECT * FROM t1 JOIN t2 JOIN LATERAL (SELECT t1.c2 + t2.c2);
-- expect error: cannot resolve `t2.c1`
SELECT * FROM t1 JOIN LATERAL (SELECT t1.c1 AS a, t2.c1 AS b) s JOIN t2 ON s.b = t2.c1;
-- multiple lateral joins
SELECT * FROM t1,
LATERAL (SELECT c1 + c2 AS a),
LATERAL (SELECT c1 - c2 AS b),
LATERAL (SELECT a * b AS c);
-- lateral join in between regular joins
SELECT * FROM t1
LEFT OUTER JOIN LATERAL (SELECT c2 FROM t2 WHERE t1.c1 = t2.c1) s
LEFT OUTER JOIN t1 t3 ON s.c2 = t3.c2;
-- nested lateral joins
SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT c1));
SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT c1 + 1 AS c1), LATERAL (SELECT c1));
SELECT * FROM t1, LATERAL (
SELECT * FROM (SELECT c1, MIN(c2) m FROM t2 WHERE t1.c1 = t2.c1 GROUP BY c1) s,
LATERAL (SELECT m WHERE m > c1)
);
-- expect error: cannot resolve `t1.c1`
SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT t1.c1 + t2.c1));
-- expect error: cannot resolve `c2`
SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT c1), LATERAL (SELECT c2));
-- uncorrelated scalar subquery inside lateral join
SELECT * FROM t1, LATERAL (SELECT c2, (SELECT MIN(c2) FROM t2));
-- correlated scalar subquery inside lateral join
SELECT * FROM t1, LATERAL (SELECT (SELECT SUM(c2) FROM t2 WHERE c1 = a) FROM (SELECT c1 AS a));
-- expect error: cannot resolve `t1.c1`
SELECT * FROM t1, LATERAL (SELECT c1, (SELECT SUM(c2) FROM t2 WHERE c1 = t1.c1));
-- lateral join inside uncorrelated subquery
SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a));
-- lateral join inside correlated subquery
SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a) WHERE c1 = t1.c1);
-- TODO(SPARK-35551): handle the COUNT bug (the expected result should be (1, 2, 0))
SELECT * FROM t1, LATERAL (SELECT COUNT(*) AS cnt FROM t2 WHERE c1 = t1.c1) WHERE cnt = 0;
-- lateral subquery with group by
SELECT * FROM t1 LEFT JOIN LATERAL (SELECT MIN(c2) FROM t2 WHERE c1 = t1.c1 GROUP BY c1);
-- lateral join inside CTE
WITH cte1 AS (
SELECT c1 FROM t1
), cte2 AS (
SELECT s.* FROM cte1, LATERAL (SELECT * FROM t2 WHERE c1 = cte1.c1) s
)
SELECT * FROM cte2;
-- clean up
DROP VIEW t1;
DROP VIEW t2;

View file

@ -340,7 +340,7 @@ select ten, sum(distinct four) filter (where string(four) like '123') from onek
group by rollup(ten); group by rollup(ten);
-- More rescan tests -- More rescan tests
-- [SPARK-27877] ANSI SQL: LATERAL derived table(T491) -- [SPARK-35554] Support outer references in Aggregate
-- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, ten, count(*) from onek group by cube(four,ten)) s on true order by v.a,four,ten; -- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, ten, count(*) from onek group by cube(four,ten)) s on true order by v.a,four,ten;
-- [SPARK-27878] Support ARRAY(sub-SELECT) expressions -- [SPARK-27878] Support ARRAY(sub-SELECT) expressions
-- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek group by cube(two,four) order by two,four) s1) from (values (1),(2)) v(a); -- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek group by cube(two,four) order by two,four) s1) from (values (1),(2)) v(a);
@ -499,14 +499,14 @@ SELECT a, b, count(*), max(a), max(b) FROM gstest3 GROUP BY GROUPING SETS(a, b,(
-- COMMIT; -- COMMIT;
-- More rescan tests -- More rescan tests
-- [SPARK-27877] ANSI SQL: LATERAL derived table(T491) -- [SPARK-35554] Support outer references in Aggregate
-- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, ten, count(*) from onek group by cube(four,ten)) s on true order by v.a,four,ten; -- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, ten, count(*) from onek group by cube(four,ten)) s on true order by v.a,four,ten;
-- [SPARK-27878] Support ARRAY(sub-SELECT) expressions -- [SPARK-27878] Support ARRAY(sub-SELECT) expressions
-- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek group by cube(two,four) order by two,four) s1) from (values (1),(2)) v(a); -- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek group by cube(two,four) order by two,four) s1) from (values (1),(2)) v(a);
-- Rescan logic changes when there are no empty grouping sets, so test -- Rescan logic changes when there are no empty grouping sets, so test
-- that too: -- that too:
-- [SPARK-27877] ANSI SQL: LATERAL derived table(T491) -- [SPARK-35554] Support outer references in Aggregate
-- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, ten, count(*) from onek group by grouping sets(four,ten)) s on true order by v.a,four,ten; -- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, ten, count(*) from onek group by grouping sets(four,ten)) s on true order by v.a,four,ten;
-- [SPARK-27878] Support ARRAY(sub-SELECT) expressions -- [SPARK-27878] Support ARRAY(sub-SELECT) expressions
-- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek group by grouping sets(two,four) order by two,four) s1) from (values (1),(2)) v(a); -- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek group by grouping sets(two,four) order by two,four) s1) from (values (1),(2)) v(a);

View file

@ -19,6 +19,12 @@
--CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY --CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
--CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN --CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN
CREATE OR REPLACE TEMPORARY VIEW INT2_TBL(f1) AS VALUES
(smallint(trim('0 '))),
(smallint(trim(' 1234 '))),
(smallint(trim(' -1234'))),
(smallint('32767')),
(smallint('-32767'));
CREATE OR REPLACE TEMPORARY VIEW INT4_TBL AS SELECT * FROM CREATE OR REPLACE TEMPORARY VIEW INT4_TBL AS SELECT * FROM
(VALUES (0), (123456), (-123456), (2147483647), (-2147483647)) (VALUES (0), (123456), (-123456), (2147483647), (-2147483647))
AS v(f1); AS v(f1);
@ -1617,22 +1623,21 @@ select uunique1 from
-- --
-- Test LATERAL -- Test LATERAL
-- --
select unique2, x.*
-- select unique2, x.* from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x;
-- from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x; -- from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x;
-- select unique2, x.* select unique2, x.*
-- from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss; from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss; -- from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from int4_tbl x cross join lateral (select unique2 from tenk1 where f1 = unique1) ss; -- from int4_tbl x cross join lateral (select unique2 from tenk1 where f1 = unique1) ss;
-- select unique2, x.* select unique2, x.*
-- from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true; from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true; -- from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true;
@ -1640,6 +1645,7 @@ select uunique1 from
-- [SPARK-27877] ANSI SQL: LATERAL derived table(T491) -- [SPARK-27877] ANSI SQL: LATERAL derived table(T491)
-- check scoping of lateral versus parent references -- check scoping of lateral versus parent references
-- the first of these should return int8_tbl.q2, the second int8_tbl.q1 -- the first of these should return int8_tbl.q2, the second int8_tbl.q1
-- Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses
-- select *, (select r from (select q1 as q2) x, (select q2 as r) y) from int8_tbl; -- select *, (select r from (select q1 as q2) x, (select q2 as r) y) from int8_tbl;
-- select *, (select r from (select q1 as q2) x, lateral (select q2 as r) y) from int8_tbl; -- select *, (select r from (select q1 as q2) x, lateral (select q2 as r) y) from int8_tbl;
@ -1688,12 +1694,12 @@ select uunique1 from
-- order by a.q1, a.q2, x.q1, x.q2, ss.z; -- order by a.q1, a.q2, x.q1, x.q2, ss.z;
-- lateral reference to a join alias variable -- lateral reference to a join alias variable
-- select * from (select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1, select * from (select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1,
-- lateral (select x) ss2(y); lateral (select x) ss2(y);
-- select * from (select f1 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1, -- select * from (select f1 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1,
-- lateral (values(x)) ss2(y); -- lateral (values(x)) ss2(y);
-- select * from ((select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1) j, select * from ((select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1) j,
-- lateral (select x) ss2(y); lateral (select x) ss2(y);
-- lateral references requiring pullup -- lateral references requiring pullup
-- select * from (values(1)) x(lb), -- select * from (values(1)) x(lb),
@ -1707,20 +1713,23 @@ select uunique1 from
-- select * from -- select * from
-- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1, -- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
-- lateral (values(x.q1,y.q1,y.q2)) v(xq1,yq1,yq2); -- lateral (values(x.q1,y.q1,y.q2)) v(xq1,yq1,yq2);
-- select * from select * from
-- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1, int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
-- lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2); lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2);
-- select x.* from select x.* from
-- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1, int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
-- lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2); lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2);
-- Accessing outer query column is not allowed in Union
-- select v.* from -- select v.* from
-- (int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1) -- (int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1)
-- left join int4_tbl z on z.f1 = x.q2, -- left join int4_tbl z on z.f1 = x.q2,
-- lateral (select x.q1,y.q1 union all select x.q2,y.q2) v(vx,vy); -- lateral (select x.q1,y.q1 union all select x.q2,y.q2) v(vx,vy);
-- Accessing outer query column is not allowed in Union
-- select v.* from -- select v.* from
-- (int8_tbl x left join (select q1,(select coalesce(q2,0)) q2 from int8_tbl) y on x.q2 = y.q1) -- (int8_tbl x left join (select q1,(select coalesce(q2,0)) q2 from int8_tbl) y on x.q2 = y.q1)
-- left join int4_tbl z on z.f1 = x.q2, -- left join int4_tbl z on z.f1 = x.q2,
-- lateral (select x.q1,y.q1 union all select x.q2,y.q2) v(vx,vy); -- lateral (select x.q1,y.q1 union all select x.q2,y.q2) v(vx,vy);
-- Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses
-- select v.* from -- select v.* from
-- (int8_tbl x left join (select q1,(select coalesce(q2,0)) q2 from int8_tbl) y on x.q2 = y.q1) -- (int8_tbl x left join (select q1,(select coalesce(q2,0)) q2 from int8_tbl) y on x.q2 = y.q1)
-- left join int4_tbl z on z.f1 = x.q2, -- left join int4_tbl z on z.f1 = x.q2,
@ -1730,24 +1739,24 @@ select uunique1 from
-- select * from -- select * from
-- int8_tbl a left join -- int8_tbl a left join
-- lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1; -- lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1;
-- select * from select * from
-- int8_tbl a left join int8_tbl a left join
-- lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1; lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1;
-- explain (verbose, costs off) -- explain (verbose, costs off)
-- select * from -- select * from
-- int8_tbl a left join -- int8_tbl a left join
-- lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1; -- lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1;
-- select * from select * from
-- int8_tbl a left join int8_tbl a left join
-- lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1; lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1;
-- lateral can result in join conditions appearing below their -- lateral can result in join conditions appearing below their
-- real semantic level -- real semantic level
-- explain (verbose, costs off) -- explain (verbose, costs off)
-- select * from int4_tbl i left join -- select * from int4_tbl i left join
-- lateral (select * from int2_tbl j where i.f1 = j.f1) k on true; -- lateral (select * from int2_tbl j where i.f1 = j.f1) k on true;
-- select * from int4_tbl i left join select * from int4_tbl i left join
-- lateral (select * from int2_tbl j where i.f1 = j.f1) k on true; lateral (select * from int2_tbl j where i.f1 = j.f1) k on true;
-- explain (verbose, costs off) -- explain (verbose, costs off)
-- select * from int4_tbl i left join -- select * from int4_tbl i left join
-- lateral (select coalesce(i) from int2_tbl j where i.f1 = j.f1) k on true; -- lateral (select coalesce(i) from int2_tbl j where i.f1 = j.f1) k on true;
@ -1770,11 +1779,11 @@ select uunique1 from
-- (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from -- (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from
-- int8_tbl b cross join int8_tbl c) ss -- int8_tbl b cross join int8_tbl c) ss
-- on a.q2 = ss.bq1; -- on a.q2 = ss.bq1;
-- select * from select * from
-- int8_tbl a left join lateral int8_tbl a left join lateral
-- (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from
-- int8_tbl b cross join int8_tbl c) ss int8_tbl b cross join int8_tbl c) ss
-- on a.q2 = ss.bq1; on a.q2 = ss.bq1;
-- case requiring nested PlaceHolderVars -- case requiring nested PlaceHolderVars
-- explain (verbose, costs off) -- explain (verbose, costs off)

View file

@ -11,6 +11,12 @@
-- Disable BroadcastHashJoin optimization to avoid changing result order when we enable AQE -- Disable BroadcastHashJoin optimization to avoid changing result order when we enable AQE
--SET spark.sql.autoBroadcastJoinThreshold = -1 --SET spark.sql.autoBroadcastJoinThreshold = -1
CREATE OR REPLACE TEMPORARY VIEW INT2_TBL(f1) AS VALUES
(smallint(trim('0 '))),
(smallint(trim(' 1234 '))),
(smallint(trim(' -1234'))),
(smallint('32767')),
(smallint('-32767'));
CREATE OR REPLACE TEMPORARY VIEW INT4_TBL AS SELECT * FROM CREATE OR REPLACE TEMPORARY VIEW INT4_TBL AS SELECT * FROM
(VALUES (0), (123456), (-123456), (2147483647), (-2147483647)) (VALUES (0), (123456), (-123456), (2147483647), (-2147483647))
AS v(f1); AS v(f1);
@ -1605,21 +1611,21 @@ select udf(uunique1) from
-- Test LATERAL -- Test LATERAL
-- --
-- select unique2, x.* select unique2, x.*
-- from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x; from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x; -- from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x;
-- select unique2, x.* select unique2, x.*
-- from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss; from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss; -- from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from int4_tbl x cross join lateral (select unique2 from tenk1 where f1 = unique1) ss; -- from int4_tbl x cross join lateral (select unique2 from tenk1 where f1 = unique1) ss;
-- select unique2, x.* select unique2, x.*
-- from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true; from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true;
-- explain (costs off) -- explain (costs off)
-- select unique2, x.* -- select unique2, x.*
-- from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true; -- from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true;
@ -1675,12 +1681,12 @@ select udf(uunique1) from
-- order by a.q1, a.q2, x.q1, x.q2, ss.z; -- order by a.q1, a.q2, x.q1, x.q2, ss.z;
-- lateral reference to a join alias variable -- lateral reference to a join alias variable
-- select * from (select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1, select * from (select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1,
-- lateral (select x) ss2(y); lateral (select x) ss2(y);
-- select * from (select f1 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1, -- select * from (select f1 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1,
-- lateral (values(x)) ss2(y); -- lateral (values(x)) ss2(y);
-- select * from ((select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1) j, select * from ((select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1) j,
-- lateral (select x) ss2(y); lateral (select x) ss2(y);
-- lateral references requiring pullup -- lateral references requiring pullup
-- select * from (values(1)) x(lb), -- select * from (values(1)) x(lb),
@ -1694,12 +1700,12 @@ select udf(uunique1) from
-- select * from -- select * from
-- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1, -- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
-- lateral (values(x.q1,y.q1,y.q2)) v(xq1,yq1,yq2); -- lateral (values(x.q1,y.q1,y.q2)) v(xq1,yq1,yq2);
-- select * from select * from
-- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1, int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
-- lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2); lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2);
-- select x.* from select x.* from
-- int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1, int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
-- lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2); lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2);
-- select v.* from -- select v.* from
-- (int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1) -- (int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1)
-- left join int4_tbl z on z.f1 = x.q2, -- left join int4_tbl z on z.f1 = x.q2,
@ -1717,24 +1723,24 @@ select udf(uunique1) from
-- select * from -- select * from
-- int8_tbl a left join -- int8_tbl a left join
-- lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1; -- lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1;
-- select * from select * from
-- int8_tbl a left join int8_tbl a left join
-- lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1; lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1;
-- explain (verbose, costs off) -- explain (verbose, costs off)
-- select * from -- select * from
-- int8_tbl a left join -- int8_tbl a left join
-- lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1; -- lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1;
-- select * from select * from
-- int8_tbl a left join int8_tbl a left join
-- lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1; lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1;
-- lateral can result in join conditions appearing below their -- lateral can result in join conditions appearing below their
-- real semantic level -- real semantic level
-- explain (verbose, costs off) -- explain (verbose, costs off)
-- select * from int4_tbl i left join -- select * from int4_tbl i left join
-- lateral (select * from int2_tbl j where i.f1 = j.f1) k on true; -- lateral (select * from int2_tbl j where i.f1 = j.f1) k on true;
-- select * from int4_tbl i left join select * from int4_tbl i left join
-- lateral (select * from int2_tbl j where i.f1 = j.f1) k on true; lateral (select * from int2_tbl j where i.f1 = j.f1) k on true;
-- explain (verbose, costs off) -- explain (verbose, costs off)
-- select * from int4_tbl i left join -- select * from int4_tbl i left join
-- lateral (select coalesce(i) from int2_tbl j where i.f1 = j.f1) k on true; -- lateral (select coalesce(i) from int2_tbl j where i.f1 = j.f1) k on true;
@ -1757,11 +1763,11 @@ select udf(uunique1) from
-- (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from -- (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from
-- int8_tbl b cross join int8_tbl c) ss -- int8_tbl b cross join int8_tbl c) ss
-- on a.q2 = ss.bq1; -- on a.q2 = ss.bq1;
-- select * from select * from
-- int8_tbl a left join lateral int8_tbl a left join lateral
-- (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from (select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from
-- int8_tbl b cross join int8_tbl c) ss int8_tbl b cross join int8_tbl c) ss
-- on a.q2 = ss.bq1; on a.q2 = ss.bq1;
-- case requiring nested PlaceHolderVars -- case requiring nested PlaceHolderVars
-- explain (verbose, costs off) -- explain (verbose, costs off)

View file

@ -0,0 +1,395 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 40
-- !query
CREATE VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2)
-- !query schema
struct<>
-- !query output
-- !query
CREATE VIEW t2(c1, c2) AS VALUES (0, 2), (0, 3)
-- !query schema
struct<>
-- !query output
-- !query
SELECT * FROM t1, LATERAL (SELECT c1)
-- !query schema
struct<c1:int,c2:int,c1:int>
-- !query output
0 1 0
1 2 1
-- !query
SELECT * FROM t1, LATERAL (SELECT c1 FROM t2)
-- !query schema
struct<c1:int,c2:int,c1:int>
-- !query output
0 1 0
0 1 0
1 2 0
1 2 0
-- !query
SELECT * FROM t1, LATERAL (SELECT t1.c1 FROM t2)
-- !query schema
struct<c1:int,c2:int,c1:int>
-- !query output
0 1 0
0 1 0
1 2 1
1 2 1
-- !query
SELECT * FROM t1, LATERAL (SELECT t1.c1 + t2.c1 FROM t2)
-- !query schema
struct<c1:int,c2:int,(outer(spark_catalog.default.t1.c1) + c1):int>
-- !query output
0 1 0
0 1 0
1 2 1
1 2 1
-- !query
SELECT * FROM t1, LATERAL (SELECT *)
-- !query schema
struct<c1:int,c2:int>
-- !query output
0 1
1 2
-- !query
SELECT * FROM t1, LATERAL (SELECT * FROM t2)
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int>
-- !query output
0 1 0 2
0 1 0 3
1 2 0 2
1 2 0 3
-- !query
SELECT * FROM t1 JOIN LATERAL (SELECT c1 + c2 AS c3) ON c2 = c3
-- !query schema
struct<c1:int,c2:int,c3:int>
-- !query output
0 1 1
-- !query
SELECT * FROM t1 LEFT JOIN LATERAL (SELECT c1 + c2 AS c3) ON c2 = c3
-- !query schema
struct<c1:int,c2:int,c3:int>
-- !query output
0 1 1
1 2 NULL
-- !query
SELECT * FROM t1 CROSS JOIN LATERAL (SELECT c1 + c2 AS c3)
-- !query schema
struct<c1:int,c2:int,c3:int>
-- !query output
0 1 1
1 2 3
-- !query
SELECT * FROM t1 NATURAL JOIN LATERAL (SELECT c1 + c2 AS c2)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException
LATERAL join with NATURAL join is not supported(line 1, pos 14)
== SQL ==
SELECT * FROM t1 NATURAL JOIN LATERAL (SELECT c1 + c2 AS c2)
--------------^^^
-- !query
SELECT * FROM t1 JOIN LATERAL (SELECT c1 + c2 AS c2) USING (c2)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException
LATERAL join with USING join is not supported(line 1, pos 14)
== SQL ==
SELECT * FROM t1 JOIN LATERAL (SELECT c1 + c2 AS c2) USING (c2)
--------------^^^
-- !query
SELECT * FROM LATERAL (SELECT * FROM t1)
-- !query schema
struct<c1:int,c2:int>
-- !query output
0 1
1 2
-- !query
SELECT * FROM t1, LATERAL (SELECT * FROM t2)
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int>
-- !query output
0 1 0 2
0 1 0 3
1 2 0 2
1 2 0 3
-- !query
SELECT * FROM LATERAL (SELECT * FROM t1), LATERAL (SELECT * FROM t2)
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int>
-- !query output
0 1 0 2
0 1 0 3
1 2 0 2
1 2 0 3
-- !query
SELECT * FROM LATERAL (SELECT * FROM t1) JOIN LATERAL (SELECT * FROM t2)
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int>
-- !query output
0 1 0 2
0 1 0 3
1 2 0 2
1 2 0 3
-- !query
SELECT a, b FROM t1, LATERAL (SELECT c1, c2) s(a, b)
-- !query schema
struct<a:int,b:int>
-- !query output
0 1
1 2
-- !query
SELECT * FROM (SELECT 1 AS c1, 2 AS c2), LATERAL (SELECT c1, c2)
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int>
-- !query output
1 2 1 2
-- !query
SELECT * FROM t1, LATERAL (SELECT c2 FROM t2 WHERE t1.c1 = t2.c1)
-- !query schema
struct<c1:int,c2:int,c2:int>
-- !query output
0 1 2
0 1 3
-- !query
SELECT * FROM t1, LATERAL (SELECT c2 FROM t2 WHERE t1.c2 < t2.c2)
-- !query schema
struct<c1:int,c2:int,c2:int>
-- !query output
0 1 2
0 1 3
1 2 3
-- !query
SELECT * FROM t1 JOIN t2 JOIN LATERAL (SELECT t1.c2 + t2.c2)
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int,(outer(spark_catalog.default.t1.c2) + outer(spark_catalog.default.t2.c2)):int>
-- !query output
0 1 0 2 3
0 1 0 3 4
1 2 0 2 4
1 2 0 3 5
-- !query
SELECT * FROM t1 JOIN LATERAL (SELECT t1.c1 AS a, t2.c1 AS b) s JOIN t2 ON s.b = t2.c1
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 't2.c1' given input columns: []; line 1 pos 50
-- !query
SELECT * FROM t1,
LATERAL (SELECT c1 + c2 AS a),
LATERAL (SELECT c1 - c2 AS b),
LATERAL (SELECT a * b AS c)
-- !query schema
struct<c1:int,c2:int,a:int,b:int,c:int>
-- !query output
0 1 1 -1 -1
1 2 3 -1 -3
-- !query
SELECT * FROM t1
LEFT OUTER JOIN LATERAL (SELECT c2 FROM t2 WHERE t1.c1 = t2.c1) s
LEFT OUTER JOIN t1 t3 ON s.c2 = t3.c2
-- !query schema
struct<c1:int,c2:int,c2:int,c1:int,c2:int>
-- !query output
0 1 2 1 2
0 1 3 NULL NULL
1 2 NULL NULL NULL
-- !query
SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT c1))
-- !query schema
struct<c1:int,c2:int,c1:int,c2:int,c1:int>
-- !query output
0 1 0 2 0
0 1 0 3 0
1 2 0 2 0
1 2 0 3 0
-- !query
SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT c1 + 1 AS c1), LATERAL (SELECT c1))
-- !query schema
struct<c1:int,c2:int,c1:int,c1:int>
-- !query output
0 1 1 1
1 2 2 2
-- !query
SELECT * FROM t1, LATERAL (
SELECT * FROM (SELECT c1, MIN(c2) m FROM t2 WHERE t1.c1 = t2.c1 GROUP BY c1) s,
LATERAL (SELECT m WHERE m > c1)
)
-- !query schema
struct<c1:int,c2:int,c1:int,m:int,m:int>
-- !query output
0 1 0 2 2
-- !query
SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT t1.c1 + t2.c1))
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 't1.c1' given input columns: []; line 1 pos 61
-- !query
SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT c1), LATERAL (SELECT c2))
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'c2' given input columns: []; line 1 pos 70
-- !query
SELECT * FROM t1, LATERAL (SELECT c2, (SELECT MIN(c2) FROM t2))
-- !query schema
struct<c1:int,c2:int,c2:int,scalarsubquery():int>
-- !query output
0 1 1 2
1 2 2 2
-- !query
SELECT * FROM t1, LATERAL (SELECT (SELECT SUM(c2) FROM t2 WHERE c1 = a) FROM (SELECT c1 AS a))
-- !query schema
struct<c1:int,c2:int,scalarsubquery(a):bigint>
-- !query output
0 1 5
1 2 NULL
-- !query
SELECT * FROM t1, LATERAL (SELECT c1, (SELECT SUM(c2) FROM t2 WHERE c1 = t1.c1))
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 't1.c1' given input columns: [spark_catalog.default.t2.c1, spark_catalog.default.t2.c2]; line 1 pos 73
-- !query
SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a))
-- !query schema
struct<c1:int,c2:int>
-- !query output
0 1
-- !query
SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a) WHERE c1 = t1.c1)
-- !query schema
struct<c1:int,c2:int>
-- !query output
0 1
-- !query
SELECT * FROM t1, LATERAL (SELECT COUNT(*) AS cnt FROM t2 WHERE c1 = t1.c1) WHERE cnt = 0
-- !query schema
struct<c1:int,c2:int,cnt:bigint>
-- !query output
-- !query
SELECT * FROM t1 LEFT JOIN LATERAL (SELECT MIN(c2) FROM t2 WHERE c1 = t1.c1 GROUP BY c1)
-- !query schema
struct<c1:int,c2:int,min(c2):int>
-- !query output
0 1 2
1 2 NULL
-- !query
WITH cte1 AS (
SELECT c1 FROM t1
), cte2 AS (
SELECT s.* FROM cte1, LATERAL (SELECT * FROM t2 WHERE c1 = cte1.c1) s
)
SELECT * FROM cte2
-- !query schema
struct<c1:int,c2:int>
-- !query output
0 2
0 3
-- !query
DROP VIEW t1
-- !query schema
struct<>
-- !query output
-- !query
DROP VIEW t2
-- !query schema
struct<>
-- !query output

View file

@ -1,5 +1,18 @@
-- Automatically generated by SQLQueryTestSuite -- Automatically generated by SQLQueryTestSuite
-- Number of queries: 181 -- Number of queries: 193
-- !query
CREATE OR REPLACE TEMPORARY VIEW INT2_TBL(f1) AS VALUES
(smallint(trim('0 '))),
(smallint(trim(' 1234 '))),
(smallint(trim(' -1234'))),
(smallint('32767')),
(smallint('-32767'))
-- !query schema
struct<>
-- !query output
-- !query -- !query
@ -3277,6 +3290,197 @@ org.apache.spark.sql.AnalysisException
cannot resolve 'uunique1' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7 cannot resolve 'uunique1' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
-- !query
select unique2, x.*
from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x
-- !query schema
struct<unique2:int,f1:int>
-- !query output
9998 0
-- !query
select unique2, x.*
from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss
-- !query schema
struct<unique2:int,f1:int>
-- !query output
9998 0
-- !query
select unique2, x.*
from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true
-- !query schema
struct<unique2:int,f1:int>
-- !query output
9998 0
NULL -123456
NULL -2147483647
NULL 123456
NULL 2147483647
-- !query
select * from (select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1,
lateral (select x) ss2(y)
-- !query schema
struct<x:double,f1:int,y:double>
-- !query output
0.0 0 0.0
-- !query
select * from ((select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1) j,
lateral (select x) ss2(y)
-- !query schema
struct<x:double,f1:int,y:double>
-- !query output
0.0 0 0.0
-- !query
select * from
int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2)
-- !query schema
struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,xq1:bigint,yq1:bigint,yq2:bigint>
-- !query output
123 456 NULL NULL 123 NULL NULL
123 4567890123456789 4567890123456789 -4567890123456789 123 4567890123456789 -4567890123456789
123 4567890123456789 4567890123456789 123 123 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123 4567890123456789 4567890123456789
4567890123456789 -4567890123456789 NULL NULL 4567890123456789 NULL NULL
4567890123456789 123 123 456 4567890123456789 123 456
4567890123456789 123 123 4567890123456789 4567890123456789 123 4567890123456789
4567890123456789 4567890123456789 4567890123456789 -4567890123456789 4567890123456789 4567890123456789 -4567890123456789
4567890123456789 4567890123456789 4567890123456789 123 4567890123456789 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query
select x.* from
int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2)
-- !query schema
struct<q1:bigint,q2:bigint>
-- !query output
123 456
123 4567890123456789
123 4567890123456789
123 4567890123456789
4567890123456789 -4567890123456789
4567890123456789 123
4567890123456789 123
4567890123456789 4567890123456789
4567890123456789 4567890123456789
4567890123456789 4567890123456789
-- !query
select * from
int8_tbl a left join
lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1
-- !query schema
struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,x:bigint>
-- !query output
123 456 NULL NULL NULL
123 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
123 4567890123456789 4567890123456789 123 4567890123456789
123 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 -4567890123456789 NULL NULL NULL
4567890123456789 123 123 456 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 123 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query
select * from
int8_tbl a left join
lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1
-- !query schema
struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,x:bigint>
-- !query output
123 456 NULL NULL NULL
123 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
123 4567890123456789 4567890123456789 123 4567890123456789
123 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 -4567890123456789 NULL NULL NULL
4567890123456789 123 123 456 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 123 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query
select * from int4_tbl i left join
lateral (select * from int2_tbl j where i.f1 = j.f1) k on true
-- !query schema
struct<f1:int,f1:smallint>
-- !query output
-123456 NULL
-2147483647 NULL
0 0
123456 NULL
2147483647 NULL
-- !query
select * from
int8_tbl a left join lateral
(select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from
int8_tbl b cross join int8_tbl c) ss
on a.q2 = ss.bq1
-- !query schema
struct<q1:bigint,q2:bigint,bq1:bigint,cq1:bigint,least(outer(a.q1), q1, q1):bigint>
-- !query output
123 456 NULL NULL NULL
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
4567890123456789 -4567890123456789 NULL NULL NULL
4567890123456789 123 123 123 123
4567890123456789 123 123 123 123
4567890123456789 123 123 123 123
4567890123456789 123 123 123 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query -- !query
select f1,g from int4_tbl a, (select f1 as g) ss select f1,g from int4_tbl a, (select f1 as g) ss
-- !query schema -- !query schema

View file

@ -1,5 +1,18 @@
-- Automatically generated by SQLQueryTestSuite -- Automatically generated by SQLQueryTestSuite
-- Number of queries: 185 -- Number of queries: 197
-- !query
CREATE OR REPLACE TEMPORARY VIEW INT2_TBL(f1) AS VALUES
(smallint(trim('0 '))),
(smallint(trim(' 1234 '))),
(smallint(trim(' -1234'))),
(smallint('32767')),
(smallint('-32767'))
-- !query schema
struct<>
-- !query output
-- !query -- !query
@ -3305,6 +3318,197 @@ org.apache.spark.sql.AnalysisException
cannot resolve 'uunique1' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 11 cannot resolve 'uunique1' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 11
-- !query
select unique2, x.*
from tenk1 a, lateral (select * from int4_tbl b where f1 = a.unique1) x
-- !query schema
struct<unique2:int,f1:int>
-- !query output
9998 0
-- !query
select unique2, x.*
from int4_tbl x, lateral (select unique2 from tenk1 where f1 = unique1) ss
-- !query schema
struct<unique2:int,f1:int>
-- !query output
9998 0
-- !query
select unique2, x.*
from int4_tbl x left join lateral (select unique1, unique2 from tenk1 where f1 = unique1) ss on true
-- !query schema
struct<unique2:int,f1:int>
-- !query output
9998 0
NULL -123456
NULL -2147483647
NULL 123456
NULL 2147483647
-- !query
select * from (select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1,
lateral (select x) ss2(y)
-- !query schema
struct<x:double,f1:int,y:double>
-- !query output
0.0 0 0.0
-- !query
select * from ((select f1/2 as x from int4_tbl) ss1 join int4_tbl i4 on x = f1) j,
lateral (select x) ss2(y)
-- !query schema
struct<x:double,f1:int,y:double>
-- !query output
0.0 0 0.0
-- !query
select * from
int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2)
-- !query schema
struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,xq1:bigint,yq1:bigint,yq2:bigint>
-- !query output
123 456 NULL NULL 123 NULL NULL
123 4567890123456789 4567890123456789 -4567890123456789 123 4567890123456789 -4567890123456789
123 4567890123456789 4567890123456789 123 123 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123 4567890123456789 4567890123456789
4567890123456789 -4567890123456789 NULL NULL 4567890123456789 NULL NULL
4567890123456789 123 123 456 4567890123456789 123 456
4567890123456789 123 123 4567890123456789 4567890123456789 123 4567890123456789
4567890123456789 4567890123456789 4567890123456789 -4567890123456789 4567890123456789 4567890123456789 -4567890123456789
4567890123456789 4567890123456789 4567890123456789 123 4567890123456789 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query
select x.* from
int8_tbl x left join (select q1,coalesce(q2,0) q2 from int8_tbl) y on x.q2 = y.q1,
lateral (select x.q1,y.q1,y.q2) v(xq1,yq1,yq2)
-- !query schema
struct<q1:bigint,q2:bigint>
-- !query output
123 456
123 4567890123456789
123 4567890123456789
123 4567890123456789
4567890123456789 -4567890123456789
4567890123456789 123
4567890123456789 123
4567890123456789 4567890123456789
4567890123456789 4567890123456789
4567890123456789 4567890123456789
-- !query
select * from
int8_tbl a left join
lateral (select *, a.q2 as x from int8_tbl b) ss on a.q2 = ss.q1
-- !query schema
struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,x:bigint>
-- !query output
123 456 NULL NULL NULL
123 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
123 4567890123456789 4567890123456789 123 4567890123456789
123 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 -4567890123456789 NULL NULL NULL
4567890123456789 123 123 456 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 123 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query
select * from
int8_tbl a left join
lateral (select *, coalesce(a.q2, 42) as x from int8_tbl b) ss on a.q2 = ss.q1
-- !query schema
struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,x:bigint>
-- !query output
123 456 NULL NULL NULL
123 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
123 4567890123456789 4567890123456789 123 4567890123456789
123 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 -4567890123456789 NULL NULL NULL
4567890123456789 123 123 456 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 -4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 123 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query
select * from int4_tbl i left join
lateral (select * from int2_tbl j where i.f1 = j.f1) k on true
-- !query schema
struct<f1:int,f1:smallint>
-- !query output
-123456 NULL
-2147483647 NULL
0 0
123456 NULL
2147483647 NULL
-- !query
select * from
int8_tbl a left join lateral
(select b.q1 as bq1, c.q1 as cq1, least(a.q1,b.q1,c.q1) from
int8_tbl b cross join int8_tbl c) ss
on a.q2 = ss.bq1
-- !query schema
struct<q1:bigint,q2:bigint,bq1:bigint,cq1:bigint,least(outer(a.q1), q1, q1):bigint>
-- !query output
123 456 NULL NULL NULL
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 123 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
123 4567890123456789 4567890123456789 4567890123456789 123
4567890123456789 -4567890123456789 NULL NULL NULL
4567890123456789 123 123 123 123
4567890123456789 123 123 123 123
4567890123456789 123 123 123 123
4567890123456789 123 123 123 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 123 123 4567890123456789 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 123 123
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
4567890123456789 4567890123456789 4567890123456789 4567890123456789 4567890123456789
-- !query -- !query
select udf(udf(f1,g)) from int4_tbl a, (select udf(udf(f1)) as g) ss select udf(udf(f1,g)) from int4_tbl a, (select udf(udf(f1)) as g) ss
-- !query schema -- !query schema

View file

@ -147,6 +147,14 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
"+- Range (0, 3, step=1, splits=None)") "+- Range (0, 3, step=1, splits=None)")
} }
test("explain lateral joins") {
checkKeywordsExistsInExplain(
sql("SELECT * FROM VALUES (0, 1) AS (a, b), LATERAL (SELECT a)"),
"LateralJoin lateral-subquery#x [a#x], Inner",
"Project [outer(a#x) AS a#x]"
)
}
test("explain string functions") { test("explain string functions") {
// Check if catalyst combine nested `Concat`s // Check if catalyst combine nested `Concat`s
val df1 = sql( val df1 = sql(