[SPARK-17100] [SQL] fix Python udf in filter on top of outer join

## What changes were proposed in this pull request?

In optimizer, we try to evaluate the condition to see whether it's nullable or not, but some expressions are not evaluable, we should check that before evaluate it.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <davies@databricks.com>

Closes #15103 from davies/udf_join.
This commit is contained in:
Davies Liu 2016-09-19 13:24:16 -07:00 committed by Davies Liu
parent e063206263
commit d8104158a9
2 changed files with 11 additions and 1 deletions

View file

@ -328,6 +328,14 @@ class SQLTests(ReusedPySparkTestCase):
[row] = self.spark.sql("SELECT double(add(1, 2)), add(double(2), 1)").collect()
self.assertEqual(tuple(row), (6, 5))
def test_udf_in_filter_on_top_of_outer_join(self):
from pyspark.sql.functions import udf
left = self.spark.createDataFrame([Row(a=1)])
right = self.spark.createDataFrame([Row(a=1)])
df = left.join(right, on='a', how='left_outer')
df = df.withColumn('b', udf(lambda x: 'x')(df.a))
self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')])
def test_udf_without_arguments(self):
self.spark.catalog.registerFunction("foo", lambda: "bar")
[row] = self.spark.sql("SELECT foo()").collect()

View file

@ -109,7 +109,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
val boundE = BindReferences.bindReference(e, attributes)
if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false
val v = boundE.eval(emptyRow)
v == null || v == false
}