[SPARK-34141][SQL] Remove side effect from ExtractGenerator

### What changes were proposed in this pull request?

Rewrote one `ExtractGenerator` case such that it would not rely on a side effect of the flatmap function.

### Why are the changes needed?

With the dataframe api it is possible to have a lazy sequence as the `output` of a `LogicalPlan`. When exploding a column on this dataframe using the `withColumn("newName", explode(col("name")))` method, the `ExtractGenerator` does not extract the generator and `CheckAnalysis` would throw an exception.

### Does this PR introduce _any_ user-facing change?

Bugfix
Before this, the work around was to put `.select("*")` before the explode.

### How was this patch tested?

UT

Closes #31213 from tanelk/SPARK-34141_extract_generator.

Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
tanel.kiis@gmail.com 2021-02-06 13:27:07 -06:00 committed by Sean Owen
parent a9969faca7
commit c73f70bb0d
3 changed files with 97 additions and 15 deletions

View file

@ -2718,19 +2718,16 @@ class Analyzer(override val catalogManager: CatalogManager)
p
case p @ Project(projectList, child) =>
// Holds the resolved generator, if one exists in the project list.
var resolvedGenerator: Generate = null
val newProjectList = projectList
val (resolvedGenerator, newProjectList) = projectList
.map(trimNonTopLevelAliases)
.flatMap {
case AliasedGenerator(generator, names, outer) if generator.childrenResolved =>
// It's a sanity check, this should not happen as the previous case will throw
// exception earlier.
assert(resolvedGenerator == null, "More than one generator found in SELECT.")
.foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) =>
e match {
case AliasedGenerator(generator, names, outer) if generator.childrenResolved =>
// It's a sanity check, this should not happen as the previous case will throw
// exception earlier.
assert(res._1.isEmpty, "More than one generator found in SELECT.")
resolvedGenerator =
Generate(
val g = Generate(
generator,
unrequiredChildIndex = Nil,
outer = outer,
@ -2738,12 +2735,14 @@ class Analyzer(override val catalogManager: CatalogManager)
generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names),
child)
resolvedGenerator.generatorOutput
case other => other :: Nil
(Some(g), res._2 ++ g.generatorOutput)
case other =>
(res._1, res._2 :+ other)
}
}
if (resolvedGenerator != null) {
Project(newProjectList, resolvedGenerator)
if (resolvedGenerator.isDefined) {
Project(newProjectList, resolvedGenerator.get)
} else {
p
}

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
/**
* Note: this test supports Scala 2.12. A parallel source tree has a 2.13 implementation.
*/
class ExtractGeneratorSuite extends AnalysisTest {
test("SPARK-34141: ExtractGenerator with lazy project list") {
val b = AttributeReference("b", ArrayType(StringType))()
val columns = AttributeReference("a", StringType)() :: b :: Nil
val explode = Alias(Explode(b), "c")()
// view is a lazy seq
val rel = LocalRelation(output = columns.view)
val plan = Project(rel.output ++ (explode :: Nil), rel)
assertAnalysisSuccess(plan)
}
}

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis
import scala.collection.immutable.LazyList
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
/**
* Note: this test supports Scala 2.13. A parallel source tree has a 2.12 implementation.
*/
class ExtractGeneratorSuite extends AnalysisTest {
test("SPARK-34141: ExtractGenerator with lazy project list") {
val b = AttributeReference("b", ArrayType(StringType))()
val columns = AttributeReference("a", StringType)() #:: b #:: LazyList.empty
val explode = Alias(Explode(b), "c")()
val rel = LocalRelation(output = columns)
val plan = Project(rel.output ++ (explode :: Nil), rel)
assertAnalysisSuccess(plan)
}
}