Commit graph

25114 commits

Author SHA1 Message Date
Jungtaek Lim (HeartSaVioR) b37c8d5cea
[SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter
#  What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details.

Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be  harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch is small.

Credit to zsxwing on discovering the broken guarantee.

## How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

Closes #25407 from HeartSaVioR/SPARK-28650.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-08-20 00:56:53 -07:00
lihao 79464bed2f [SPARK-28662][SQL] Create Hive Partitioned Table DDL should fail when partition column type missed
## What changes were proposed in this pull request?
Create Hive Partitioned Table without specifying data type for partition column will success unexpectedly.
```HiveQL
// create a hive table partition by b, but the data type of b isn't specified.
CREATE TABLE tbl(a int) PARTITIONED BY (b) STORED AS parquet
```
In https://issues.apache.org/jira/browse/SPARK-26435 ,  PARTITIONED BY clause  are extended to support Hive CTAS as following:
```ANTLR
// Before
(PARTITIONED BY '(' partitionColumns=colTypeList ')'

 // After
(PARTITIONED BY '(' partitionColumns=colTypeList ')'|
PARTITIONED BY partitionColumnNames=identifierList) |
```

Create Table Statement like above case will pass the syntax check,  and recognized as (PARTITIONED BY partitionColumnNames=identifierList) 。

This PR  will check this case in visitCreateHiveTable and throw a exception which contains  explicit error message to user.

## How was this patch tested?

Added tests.

Closes #25390 from lidinghao/hive-ddl-fix.

Authored-by: lihao <lihaowhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-20 14:37:04 +08:00
WeichenXu bc75ed675b [SPARK-28483][CORE] Fix canceling a spark job using barrier mode but barrier tasks blocking on BarrierTaskContext.barrier()
## What changes were proposed in this pull request?

Fix canceling a spark job using barrier mode but barrier tasks do not exit.
Currently, when spark tasks are killed, `BarrierTaskContext.barrier()` cannot be killed (it will blocking on RPC request), cause the task blocking and cannot exit.

In my PR I implement an interface for RPC which support `abort` in class `RpcEndpointRef`
```
  def askAbortable[T: ClassTag](
      message: Any,
      timeout: RpcTimeout): AbortableRpcFuture[T]
```

The returned `AbortableRpcFuture` instance include an `abort` method so that we can abort the RPC before it timeout.

## How was this patch tested?

Unit test added.

Manually test:

### Test code
launch spark-shell via `spark-shell --master local[4]`
and run following code:
```
sc.setLogLevel("INFO")
import org.apache.spark.BarrierTaskContext
val n = 4
def taskf(iter: Iterator[Int]): Iterator[Int] = {
  val context = BarrierTaskContext.get()
  val x = iter.next()
  if (x % 2 == 0) {
    // sleep 6000000 seconds with task killed checking
    for (i <- 0 until 6000000) {
      Thread.sleep(1000)
      if (context.isInterrupted()) {
        throw new org.apache.spark.TaskKilledException()
      }
    }
  }
  context.barrier()
  return Iterator.empty
}

// launch spark job, including 4 tasks, tasks 1/3 will enter `barrier()`, and tasks 0/2 will enter `sleep`
sc.parallelize((0 to n), n).barrier().mapPartitions(taskf).collect()
```
And then press Ctrl+C to exit the running job.

### Before
press Ctrl+C to exit the running job, then open spark UI we can see 2 tasks (task 1/3) are not killed. They are blocking.

### After
press Ctrl+C to exit the running job,  we can see in spark UI all tasks killed successfully.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25235 from WeichenXu123/sc_14848.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-20 14:21:47 +08:00
Yuanjian Li 0d3a783cc5 [SPARK-28699][CORE] Fix a corner case for aborting indeterminate stage
### What changes were proposed in this pull request?
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

### Why are the changes needed?
In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

### Does this PR introduce any user-facing change?
It makes the corner case of indeterminate stage abort as expected.

### How was this patch tested?
New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25498 from xuanyuanking/SPARK-28699-followup.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-20 13:47:59 +08:00
darrentirto a787bc2884 [SPARK-28777][PYTHON][DOCS] Fix format_string doc string with the correct parameters
### What changes were proposed in this pull request?
The parameters doc string of the function format_string was changed from _col_, _d_ to _format_, _cols_ which is what the actual function declaration states

### Why are the changes needed?
The parameters stated by the documentation was inaccurate

### Does this PR introduce any user-facing change?
Yes.

**BEFORE**
![before](https://user-images.githubusercontent.com/9700541/63310013-e21a0e80-c2ad-11e9-806b-1d272c5cde12.png)

**AFTER**
![after](https://user-images.githubusercontent.com/9700541/63315812-6b870c00-c2c1-11e9-8165-82782628cd1a.png)

### How was this patch tested?
N/A: documentation only
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Closes #25506 from darrentirto/SPARK-28777.

Authored-by: darrentirto <darrentirto@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-19 20:44:46 -07:00
Sean Owen 3b4e345fa1 [SPARK-28775][CORE][TESTS] Skip date 8633 in Kwajalein due to changes in tzdata2018i that only some JDK 8s use
### What changes were proposed in this pull request?

Some newer JDKs use the tzdata2018i database, which changes how certain (obscure) historical dates and timezones are handled. As previously, we can pretty much safely ignore these in tests, as the value may vary by JDK.

### Why are the changes needed?

Test otherwise fails using, for example, JDK 1.8.0_222. https://bugs.openjdk.java.net/browse/JDK-8215982 has a full list of JDKs which has this.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests

Closes #25504 from srowen/SPARK-28775.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-19 17:54:25 -07:00
Mick Jermsurawong b79cf0d143 [SPARK-28224][SQL] Check overflow in decimal Sum aggregate
## What changes were proposed in this pull request?
- Currently `sum` in aggregates for decimal type can overflow and return null.
  - `Sum` expression codegens arithmetic on `sql.Decimal` and the output which preserves scale and precision goes into `UnsafeRowWriter`. Here overflowing will be converted to null when writing out.
  - It also does not go through this branch in `DecimalAggregates` because it's expecting precision of the sum (not the elements to be summed) to be less than 5.
4ebff5b6d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (L1400-L1403)

- This PR adds the check at the final result of the sum operator itself.
4ebff5b6d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala (L372-L376)

https://issues.apache.org/jira/browse/SPARK-28224

## How was this patch tested?

- Added an integration test on dataframe suite

cc mgaido91 JoshRosen

Closes #25033 from mickjermsurawong-stripe/SPARK-28224.

Authored-by: Mick Jermsurawong <mickjermsurawong@stripe.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-08-20 09:47:04 +09:00
Takuya UESHIN 26f344354b [SPARK-27905][SQL][FOLLOW-UP] Add prettyNames
### What changes were proposed in this pull request?

This is a follow-up of #24761 which added a higher-order function `ArrayForAll`.
The PR mistakenly removed the `prettyName` from `ArrayExists` and forgot to add it to `ArrayForAll`.

### Why are the changes needed?

This reverts the `prettyName` back to `ArrayExists` not to affect explained plans, and adds it to `ArrayForAll` to clarify the `prettyName` as the same as the expressions around.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #25501 from ueshin/issues/SPARK-27905/pretty_names.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-19 15:15:50 -07:00
Sean Owen fa7fd8f2a4 [SPARK-28434][TESTS][ML] Fix values in dummy tree in DecisionTreeSuite
### What changes were proposed in this pull request?

Fix dummy tree created in decision tree tests to have actually consistent stats, so that it can be compared in tests more completely. The current one has values for, say, impurity that don't even match internally.

With this, the tests can assert more about stats staying correct after load.

### Why are the changes needed?

Fixes a TODO and improves the test slightly.

### Does this PR introduce any user-facing change?

None

### How was this patch tested?

Existing tests.

Closes #25485 from srowen/SPARK-28434.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-19 17:01:14 -05:00
Marcelo Vanzin 5f6eb5d20d [SPARK-28634][YARN] Ignore kerberos login config in client mode AM
This change makes the client mode AM ignore any login configuration,
which is now always handled by the driver. The previous code tried
to achieve that by modifying the configuration visible to the AM, but
that missed the case where old configuration names were being used.

Tested in real cluster with reproduction provided in the bug.

Closes #25467 from vanzin/SPARK-28634.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-19 11:06:02 -07:00
HyukjinKwon 1de4a22c52 Revert "[SPARK-28759][BUILD] Upgrade scala-maven-plugin to 4.1.1"
This reverts commit 1819a6f22e.
2019-08-19 20:32:07 +09:00
HyukjinKwon 2fd83c2820 [SPARK-28756][R][FOLLOW-UP] Specify minimum and maximum Java versions
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This PR proposes to set minimum and maximum Java version specification. (see https://cran.r-project.org/doc/manuals/r-release/R-exts.html#Writing-portable-packages).

Seems there is not the standard way to specify both given the documentation and other packages (see https://gist.github.com/glin/bd36cf1eb0c7f8b1f511e70e2fb20f8d).

I found two ways from existing packages on CRAN.

```
Package (<= 1 & > 2)
Package (<= 1, > 2)
```

The latter seems closer to other standard notations such as `R (>= 2.14.0), R (>= r56550)`. So I have chosen the latter way.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Seems the package might be rejected by CRAN. See https://github.com/apache/spark/pull/25472#issuecomment-522405742

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->

No.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

JDK 8

```bash
./build/mvn -DskipTests -Psparkr clean package
./R/run-tests.sh

...
basic tests for CRAN: .............
...
```

JDK 11

```bash
./build/mvn -DskipTests -Psparkr -Phadoop-3.2 clean package
./R/run-tests.sh

...
basic tests for CRAN: .............
...
```

Closes #25490 from HyukjinKwon/SPARK-28756.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-19 20:15:17 +09:00
Huaxin Gao ec14b6eb65 [SPARK-28393][SQL][PYTHON][TESTS] Convert and port 'pgSQL/join.sql' into UDF test base
## What changes were proposed in this pull request?

This PR adds some tests converted from ```pgSQL/join.sql``` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
<details><summary>Diff comparing to 'join.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-join.sql.out
index f75fe05196..ad2b5dd0db 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-join.sql.out
 -240,10 +240,10  struct<>

 -- !query 27
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(i), udf(j), udf(t)
   FROM J1_TBL AS tx
 -- !query 27 schema
-struct<xxx:string,i:int,j:int,t:string>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string>
 -- !query 27 output
        0       NULL    zero
        1       4       one
 -259,10 +259,10  struct<xxx:string,i:int,j:int,t:string>

 -- !query 28
-SELECT '' AS `xxx`, *
+SELECT udf(udf('')) AS `xxx`, udf(udf(i)), udf(j), udf(t)
   FROM J1_TBL tx
 -- !query 28 schema
-struct<xxx:string,i:int,j:int,t:string>
+struct<xxx:string,CAST(udf(cast(cast(udf(cast(i as string)) as int) as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string>
 -- !query 28 output
        0       NULL    zero
        1       4       one
 -278,10 +278,10  struct<xxx:string,i:int,j:int,t:string>

 -- !query 29
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, a, udf(udf(b)), c
   FROM J1_TBL AS t1 (a, b, c)
 -- !query 29 schema
-struct<xxx:string,a:int,b:int,c:string>
+struct<xxx:string,a:int,CAST(udf(cast(cast(udf(cast(b as string)) as int) as string)) AS INT):int,c:string>
 -- !query 29 output
        0       NULL    zero
        1       4       one
 -297,10 +297,10  struct<xxx:string,a:int,b:int,c:string>

 -- !query 30
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(a), udf(b), udf(udf(c))
   FROM J1_TBL t1 (a, b, c)
 -- !query 30 schema
-struct<xxx:string,a:int,b:int,c:string>
+struct<xxx:string,CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(c as string)) as string) as string)) AS STRING):string>
 -- !query 30 output
        0       NULL    zero
        1       4       one
 -316,10 +316,10  struct<xxx:string,a:int,b:int,c:string>

 -- !query 31
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(a), b, udf(c), udf(d), e
   FROM J1_TBL t1 (a, b, c), J2_TBL t2 (d, e)
 -- !query 31 schema
-struct<xxx:string,a:int,b:int,c:string,d:int,e:int>
+struct<xxx:string,CAST(udf(cast(a as string)) AS INT):int,b:int,CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(d as string)) AS INT):int,e:int>
 -- !query 31 output
        0       NULL    zero    0       NULL
        0       NULL    zero    1       -1
 -423,7 +423,7  struct<xxx:string,a:int,b:int,c:string,d:int,e:int>

 -- !query 32
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, *
   FROM J1_TBL CROSS JOIN J2_TBL
 -- !query 32 schema
 struct<xxx:string,i:int,j:int,t:string,i:int,k:int>
 -530,20 +530,20  struct<xxx:string,i:int,j:int,t:string,i:int,k:int>

 -- !query 33
-SELECT '' AS `xxx`, i, k, t
+SELECT udf('') AS `xxx`, udf(i) AS i, udf(k), udf(t) AS t
   FROM J1_TBL CROSS JOIN J2_TBL
 -- !query 33 schema
 struct<>
 -- !query 33 output
 org.apache.spark.sql.AnalysisException
-Reference 'i' is ambiguous, could be: default.j1_tbl.i, default.j2_tbl.i.; line 1 pos 20
+Reference 'i' is ambiguous, could be: default.j1_tbl.i, default.j2_tbl.i.; line 1 pos 29

 -- !query 34
-SELECT '' AS `xxx`, t1.i, k, t
+SELECT udf('') AS `xxx`, udf(t1.i) AS i, udf(k), udf(t)
   FROM J1_TBL t1 CROSS JOIN J2_TBL t2
 -- !query 34 schema
-struct<xxx:string,i:int,k:int,t:string>
+struct<xxx:string,i:int,CAST(udf(cast(k as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string>
 -- !query 34 output
        0       -1      zero
        0       -3      zero
 -647,11 +647,11  struct<xxx:string,i:int,k:int,t:string>

 -- !query 35
-SELECT '' AS `xxx`, ii, tt, kk
+SELECT udf(udf('')) AS `xxx`, udf(udf(ii)) AS ii, udf(udf(tt)) AS tt, udf(udf(kk))
   FROM (J1_TBL CROSS JOIN J2_TBL)
     AS tx (ii, jj, tt, ii2, kk)
 -- !query 35 schema
-struct<xxx:string,ii:int,tt:string,kk:int>
+struct<xxx:string,ii:int,tt:string,CAST(udf(cast(cast(udf(cast(kk as string)) as int) as string)) AS INT):int>
 -- !query 35 output
        0       zero    -1
        0       zero    -3
 -755,10 +755,10  struct<xxx:string,ii:int,tt:string,kk:int>

 -- !query 36
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(udf(j1_tbl.i)), udf(j), udf(t), udf(a.i), udf(a.k), udf(b.i),  udf(b.k)
   FROM J1_TBL CROSS JOIN J2_TBL a CROSS JOIN J2_TBL b
 -- !query 36 schema
-struct<xxx:string,i:int,j:int,t:string,i:int,k:int,i:int,k:int>
+struct<xxx:string,CAST(udf(cast(cast(udf(cast(i as string)) as int) as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(k as string)) AS INT):int,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 36 output
        0       NULL    zero    0       NULL    0       NULL
        0       NULL    zero    0       NULL    1       -1
 -1654,10 +1654,10  struct<xxx:string,i:int,j:int,t:string,i:int,k:int,i:int,k:int>

 -- !query 37
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(i) AS i, udf(j), udf(t) AS t, udf(k)
   FROM J1_TBL INNER JOIN J2_TBL USING (i)
 -- !query 37 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,i:int,CAST(udf(cast(j as string)) AS INT):int,t:string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 37 output
        0       NULL    zero    NULL
        1       4       one     -1
 -1669,10 +1669,10  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 38
-SELECT '' AS `xxx`, *
+SELECT udf(udf('')) AS `xxx`, udf(i), udf(j) AS j, udf(t), udf(k) AS k
   FROM J1_TBL JOIN J2_TBL USING (i)
 -- !query 38 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,j:int,CAST(udf(cast(t as string)) AS STRING):string,k:int>
 -- !query 38 output
        0       NULL    zero    NULL
        1       4       one     -1
 -1684,9 +1684,9  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 39
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, *
   FROM J1_TBL t1 (a, b, c) JOIN J2_TBL t2 (a, d) USING (a)
-  ORDER BY a, d
+  ORDER BY udf(udf(a)), udf(d)
 -- !query 39 schema
 struct<xxx:string,a:int,b:int,c:string,d:int>
 -- !query 39 output
 -1700,10 +1700,10  struct<xxx:string,a:int,b:int,c:string,d:int>

 -- !query 40
-SELECT '' AS `xxx`, *
+SELECT udf(udf('')) AS `xxx`, udf(i), udf(j), udf(t), udf(k)
   FROM J1_TBL NATURAL JOIN J2_TBL
 -- !query 40 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 40 output
        0       NULL    zero    NULL
        1       4       one     -1
 -1715,10 +1715,10  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 41
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(udf(udf(a))) AS a, udf(b), udf(c), udf(d)
   FROM J1_TBL t1 (a, b, c) NATURAL JOIN J2_TBL t2 (a, d)
 -- !query 41 schema
-struct<xxx:string,a:int,b:int,c:string,d:int>
+struct<xxx:string,a:int,CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(d as string)) AS INT):int>
 -- !query 41 output
        0       NULL    zero    NULL
        1       4       one     -1
 -1730,10 +1730,10  struct<xxx:string,a:int,b:int,c:string,d:int>

 -- !query 42
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(udf(a)), udf(udf(b)), udf(udf(c)) AS c, udf(udf(udf(d))) AS d
   FROM J1_TBL t1 (a, b, c) NATURAL JOIN J2_TBL t2 (d, a)
 -- !query 42 schema
-struct<xxx:string,a:int,b:int,c:string,d:int>
+struct<xxx:string,CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(b as string)) as int) as string)) AS INT):int,c:string,d:int>
 -- !query 42 output
        0       NULL    zero    NULL
        2       3       two     2
 -1741,10 +1741,10  struct<xxx:string,a:int,b:int,c:string,d:int>

 -- !query 43
-SELECT '' AS `xxx`, *
-  FROM J1_TBL JOIN J2_TBL ON (J1_TBL.i = J2_TBL.i)
+SELECT udf('') AS `xxx`, udf(J1_TBL.i), udf(udf(J1_TBL.j)), udf(J1_TBL.t), udf(J2_TBL.i), udf(J2_TBL.k)
+  FROM J1_TBL JOIN J2_TBL ON (udf(J1_TBL.i) = J2_TBL.i)
 -- !query 43 schema
-struct<xxx:string,i:int,j:int,t:string,i:int,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(j as string)) as int) as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 43 output
        0       NULL    zero    0       NULL
        1       4       one     1       -1
 -1756,10 +1756,10  struct<xxx:string,i:int,j:int,t:string,i:int,k:int>

 -- !query 44
-SELECT '' AS `xxx`, *
-  FROM J1_TBL JOIN J2_TBL ON (J1_TBL.i = J2_TBL.k)
+SELECT udf('') AS `xxx`, udf(udf(J1_TBL.i)), udf(udf(J1_TBL.j)), udf(udf(J1_TBL.t)), J2_TBL.i, J2_TBL.k
+  FROM J1_TBL JOIN J2_TBL ON (J1_TBL.i = udf(J2_TBL.k))
 -- !query 44 schema
-struct<xxx:string,i:int,j:int,t:string,i:int,k:int>
+struct<xxx:string,CAST(udf(cast(cast(udf(cast(i as string)) as int) as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(j as string)) as int) as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(t as string)) as string) as string)) AS STRING):string,i:int,k:int>
 -- !query 44 output
        0       NULL    zero    NULL    0
        2       3       two     2       2
 -1767,10 +1767,10  struct<xxx:string,i:int,j:int,t:string,i:int,k:int>

 -- !query 45
-SELECT '' AS `xxx`, *
-  FROM J1_TBL JOIN J2_TBL ON (J1_TBL.i <= J2_TBL.k)
+SELECT udf('') AS `xxx`, udf(J1_TBL.i), udf(J1_TBL.j), udf(J1_TBL.t), udf(J2_TBL.i), udf(J2_TBL.k)
+  FROM J1_TBL JOIN J2_TBL ON (udf(J1_TBL.i) <= udf(udf(J2_TBL.k)))
 -- !query 45 schema
-struct<xxx:string,i:int,j:int,t:string,i:int,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 45 output
        0       NULL    zero    2       2
        0       NULL    zero    2       4
 -1784,11 +1784,11  struct<xxx:string,i:int,j:int,t:string,i:int,k:int>

 -- !query 46
-SELECT '' AS `xxx`, *
+SELECT udf(udf('')) AS `xxx`, udf(i), udf(j), udf(t), udf(k)
   FROM J1_TBL LEFT OUTER JOIN J2_TBL USING (i)
-  ORDER BY i, k, t
+  ORDER BY udf(udf(i)), udf(k), udf(t)
 -- !query 46 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 46 output
        NULL    NULL    null    NULL
        NULL    0       zero    NULL
 -1806,11 +1806,11  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 47
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(i), udf(j), udf(t), udf(k)
   FROM J1_TBL LEFT JOIN J2_TBL USING (i)
-  ORDER BY i, k, t
+  ORDER BY udf(i), udf(udf(k)), udf(t)
 -- !query 47 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 47 output
        NULL    NULL    null    NULL
        NULL    0       zero    NULL
 -1828,10 +1828,10  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 48
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(udf(i)), udf(j), udf(t), udf(k)
   FROM J1_TBL RIGHT OUTER JOIN J2_TBL USING (i)
 -- !query 48 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(cast(udf(cast(i as string)) as int) as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 48 output
        0       NULL    zero    NULL
        1       4       one     -1
 -1845,10 +1845,10  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 49
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(i), udf(udf(j)), udf(t), udf(k)
   FROM J1_TBL RIGHT JOIN J2_TBL USING (i)
 -- !query 49 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(j as string)) as int) as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 49 output
        0       NULL    zero    NULL
        1       4       one     -1
 -1862,11 +1862,11  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 50
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(i), udf(j), udf(udf(t)), udf(k)
   FROM J1_TBL FULL OUTER JOIN J2_TBL USING (i)
-  ORDER BY i, k, t
+  ORDER BY udf(udf(i)), udf(k), udf(t)
 -- !query 50 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(t as string)) as string) as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 50 output
        NULL    NULL    NULL    NULL
        NULL    NULL    null    NULL
 -1886,11 +1886,11  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 51
-SELECT '' AS `xxx`, *
+SELECT udf('') AS `xxx`, udf(i), udf(j), t, udf(udf(k))
   FROM J1_TBL FULL JOIN J2_TBL USING (i)
-  ORDER BY i, k, t
+  ORDER BY udf(udf(i)), udf(k), udf(udf(t))
 -- !query 51 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,t:string,CAST(udf(cast(cast(udf(cast(k as string)) as int) as string)) AS INT):int>
 -- !query 51 output
        NULL    NULL    NULL    NULL
        NULL    NULL    null    NULL
 -1910,19 +1910,19  struct<xxx:string,i:int,j:int,t:string,k:int>

 -- !query 52
-SELECT '' AS `xxx`, *
-  FROM J1_TBL LEFT JOIN J2_TBL USING (i) WHERE (k = 1)
+SELECT udf('') AS `xxx`, udf(i), udf(j), udf(t), udf(udf(k))
+  FROM J1_TBL LEFT JOIN J2_TBL USING (i) WHERE (udf(k) = 1)
 -- !query 52 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(cast(udf(cast(k as string)) as int) as string)) AS INT):int>
 -- !query 52 output

 -- !query 53
-SELECT '' AS `xxx`, *
-  FROM J1_TBL LEFT JOIN J2_TBL USING (i) WHERE (i = 1)
+SELECT udf('') AS `xxx`, udf(i), udf(j), udf(t), udf(k)
+  FROM J1_TBL LEFT JOIN J2_TBL USING (i) WHERE (udf(udf(i)) = udf(1))
 -- !query 53 schema
-struct<xxx:string,i:int,j:int,t:string,k:int>
+struct<xxx:string,CAST(udf(cast(i as string)) AS INT):int,CAST(udf(cast(j as string)) AS INT):int,CAST(udf(cast(t as string)) AS STRING):string,CAST(udf(cast(k as string)) AS INT):int>
 -- !query 53 output
        1       4       one     -1

 -2020,9 +2020,9  ee        NULL    42      NULL

 -- !query 65
 SELECT * FROM
-(SELECT * FROM t2) as s2
+(SELECT udf(name) as name, t2.n FROM t2) as s2
 INNER JOIN
-(SELECT * FROM t3) s3
+(SELECT udf(udf(name)) as name, t3.n FROM t3) s3
 USING (name)
 -- !query 65 schema
 struct<name:string,n:int,n:int>
 -2033,9 +2033,9  cc        22      23

 -- !query 66
 SELECT * FROM
-(SELECT * FROM t2) as s2
+(SELECT udf(udf(name)) as name, t2.n FROM t2) as s2
 LEFT JOIN
-(SELECT * FROM t3) s3
+(SELECT udf(name) as name, t3.n FROM t3) s3
 USING (name)
 -- !query 66 schema
 struct<name:string,n:int,n:int>
 -2046,13 +2046,13  ee      42      NULL

 -- !query 67
-SELECT * FROM
+SELECT udf(name), udf(udf(s2.n)), udf(s3.n) FROM
 (SELECT * FROM t2) as s2
 FULL JOIN
 (SELECT * FROM t3) s3
 USING (name)
 -- !query 67 schema
-struct<name:string,n:int,n:int>
+struct<CAST(udf(cast(name as string)) AS STRING):string,CAST(udf(cast(cast(udf(cast(n as string)) as int) as string)) AS INT):int,CAST(udf(cast(n as string)) AS INT):int>
 -- !query 67 output
 bb     12      13
 cc     22      23
 -2062,9 +2062,9  ee        42      NULL

 -- !query 68
 SELECT * FROM
-(SELECT name, n as s2_n, 2 as s2_2 FROM t2) as s2
+(SELECT udf(udf(name)) as name, udf(n) as s2_n, udf(2) as s2_2 FROM t2) as s2
 NATURAL INNER JOIN
-(SELECT name, n as s3_n, 3 as s3_2 FROM t3) s3
+(SELECT udf(name) as name, udf(udf(n)) as s3_n, udf(3) as s3_2 FROM t3) s3
 -- !query 68 schema
 struct<name:string,s2_n:int,s2_2:int,s3_n:int,s3_2:int>
 -- !query 68 output
 -2074,9 +2074,9  cc        22      2       23      3

 -- !query 69
 SELECT * FROM
-(SELECT name, n as s2_n, 2 as s2_2 FROM t2) as s2
+(SELECT udf(name) as name, udf(udf(n)) as s2_n, 2 as s2_2 FROM t2) as s2
 NATURAL LEFT JOIN
-(SELECT name, n as s3_n, 3 as s3_2 FROM t3) s3
+(SELECT udf(udf(name)) as name, udf(n) as s3_n, 3 as s3_2 FROM t3) s3
 -- !query 69 schema
 struct<name:string,s2_n:int,s2_2:int,s3_n:int,s3_2:int>
 -- !query 69 output
 -2087,9 +2087,9  ee        42      2       NULL    NULL

 -- !query 70
 SELECT * FROM
-(SELECT name, n as s2_n, 2 as s2_2 FROM t2) as s2
+(SELECT udf(name) as name, udf(n) as s2_n, 2 as s2_2 FROM t2) as s2
 NATURAL FULL JOIN
-(SELECT name, n as s3_n, 3 as s3_2 FROM t3) s3
+(SELECT udf(udf(name)) as name, udf(udf(n)) as s3_n, 3 as s3_2 FROM t3) s3
 -- !query 70 schema
 struct<name:string,s2_n:int,s2_2:int,s3_n:int,s3_2:int>
 -- !query 70 output
 -2101,11 +2101,11  ee      42      2       NULL    NULL

 -- !query 71
 SELECT * FROM
-(SELECT name, n as s1_n, 1 as s1_1 FROM t1) as s1
+(SELECT udf(udf(name)) as name, udf(n) as s1_n, 1 as s1_1 FROM t1) as s1
 NATURAL INNER JOIN
-(SELECT name, n as s2_n, 2 as s2_2 FROM t2) as s2
+(SELECT udf(name) as name, udf(n) as s2_n, 2 as s2_2 FROM t2) as s2
 NATURAL INNER JOIN
-(SELECT name, n as s3_n, 3 as s3_2 FROM t3) s3
+(SELECT udf(udf(udf(name))) as name, udf(n) as s3_n, 3 as s3_2 FROM t3) s3
 -- !query 71 schema
 struct<name:string,s1_n:int,s1_1:int,s2_n:int,s2_2:int,s3_n:int,s3_2:int>
 -- !query 71 output
 -2114,11 +2114,11  bb      11      1       12      2       13      3

 -- !query 72
 SELECT * FROM
-(SELECT name, n as s1_n, 1 as s1_1 FROM t1) as s1
+(SELECT udf(name) as name, udf(n) as s1_n, udf(udf(1)) as s1_1 FROM t1) as s1
 NATURAL FULL JOIN
-(SELECT name, n as s2_n, 2 as s2_2 FROM t2) as s2
+(SELECT udf(name) as name, udf(udf(n)) as s2_n, udf(2) as s2_2 FROM t2) as s2
 NATURAL FULL JOIN
-(SELECT name, n as s3_n, 3 as s3_2 FROM t3) s3
+(SELECT udf(udf(name)) as name, udf(n) as s3_n, udf(3) as s3_2 FROM t3) s3
 -- !query 72 schema
 struct<name:string,s1_n:int,s1_1:int,s2_n:int,s2_2:int,s3_n:int,s3_2:int>
 -- !query 72 output
 -2129,16 +2129,16  ee      NULL    NULL    42      2       NULL    NULL

 -- !query 73
-SELECT * FROM
-(SELECT name, n as s1_n FROM t1) as s1
+SELECT name, udf(udf(s1_n)), udf(s2_n), udf(s3_n) FROM
+(SELECT name, udf(udf(n)) as s1_n FROM t1) as s1
 NATURAL FULL JOIN
   (SELECT * FROM
-    (SELECT name, n as s2_n FROM t2) as s2
+    (SELECT name, udf(n) as s2_n FROM t2) as s2
     NATURAL FULL JOIN
-    (SELECT name, n as s3_n FROM t3) as s3
+    (SELECT name, udf(udf(n)) as s3_n FROM t3) as s3
   ) ss2
 -- !query 73 schema
-struct<name:string,s1_n:int,s2_n:int,s3_n:int>
+struct<name:string,CAST(udf(cast(cast(udf(cast(s1_n as string)) as int) as string)) AS INT):int,CAST(udf(cast(s2_n as string)) AS INT):int,CAST(udf(cast(s3_n as string)) AS INT):int>
 -- !query 73 output
 bb     11      12      13
 cc     NULL    22      23
 -2151,9 +2151,9  SELECT * FROM
 (SELECT name, n as s1_n FROM t1) as s1
 NATURAL FULL JOIN
   (SELECT * FROM
-    (SELECT name, n as s2_n, 2 as s2_2 FROM t2) as s2
+    (SELECT name, udf(udf(n)) as s2_n, 2 as s2_2 FROM t2) as s2
     NATURAL FULL JOIN
-    (SELECT name, n as s3_n FROM t3) as s3
+    (SELECT name, udf(n) as s3_n FROM t3) as s3
   ) ss2
 -- !query 74 schema
 struct<name:string,s1_n:int,s2_n:int,s2_2:int,s3_n:int>
 -2165,13 +2165,13  ee      NULL    42      2       NULL

 -- !query 75
-SELECT * FROM
-  (SELECT name, n as s1_n FROM t1) as s1
+SELECT s1.name, udf(s1_n), s2.name, udf(udf(s2_n)) FROM
+  (SELECT name, udf(n) as s1_n FROM t1) as s1
 FULL JOIN
   (SELECT name, 2 as s2_n FROM t2) as s2
-ON (s1_n = s2_n)
+ON (udf(udf(s1_n)) = udf(s2_n))
 -- !query 75 schema
-struct<name:string,s1_n:int,name:string,s2_n:int>
+struct<name:string,CAST(udf(cast(s1_n as string)) AS INT):int,name:string,CAST(udf(cast(cast(udf(cast(s2_n as string)) as int) as string)) AS INT):int>
 -- !query 75 output
 NULL   NULL    bb      2
 NULL   NULL    cc      2
 -2200,9 +2200,9  struct<>

 -- !query 78
-select * from x
+select udf(udf(x1)), udf(x2) from x
 -- !query 78 schema
-struct<x1:int,x2:int>
+struct<CAST(udf(cast(cast(udf(cast(x1 as string)) as int) as string)) AS INT):int,CAST(udf(cast(x2 as string)) AS INT):int>
 -- !query 78 output
 1      11
 2      22
 -2212,9 +2212,9  struct<x1:int,x2:int>

 -- !query 79
-select * from y
+select udf(y1), udf(udf(y2)) from y
 -- !query 79 schema
-struct<y1:int,y2:int>
+struct<CAST(udf(cast(y1 as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(y2 as string)) as int) as string)) AS INT):int>
 -- !query 79 output
 1      111
 2      222
 -2223,7 +2223,7  struct<y1:int,y2:int>

 -- !query 80
-select * from x left join y on (x1 = y1 and x2 is not null)
+select * from x left join y on (udf(x1) = udf(udf(y1)) and udf(x2) is not null)
 -- !query 80 schema
 struct<x1:int,x2:int,y1:int,y2:int>
 -- !query 80 output
 -2235,7 +2235,7  struct<x1:int,x2:int,y1:int,y2:int>

 -- !query 81
-select * from x left join y on (x1 = y1 and y2 is not null)
+select * from x left join y on (udf(udf(x1)) = udf(y1) and udf(y2) is not null)
 -- !query 81 schema
 struct<x1:int,x2:int,y1:int,y2:int>
 -- !query 81 output
 -2247,8 +2247,8  struct<x1:int,x2:int,y1:int,y2:int>

 -- !query 82
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1)
+select * from (x left join y on (udf(x1) = udf(udf(y1)))) left join x xx(xx1,xx2)
+on (udf(udf(x1)) = udf(xx1))
 -- !query 82 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 82 output
 -2260,8 +2260,8  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 83
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1 and x2 is not null)
+select * from (x left join y on (udf(x1) = udf(y1))) left join x xx(xx1,xx2)
+on (udf(x1) = xx1 and udf(x2) is not null)
 -- !query 83 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 83 output
 -2273,8 +2273,8  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 84
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1 and y2 is not null)
+select * from (x left join y on (x1 = udf(y1))) left join x xx(xx1,xx2)
+on (udf(x1) = udf(udf(xx1)) and udf(y2) is not null)
 -- !query 84 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 84 output
 -2286,8 +2286,8  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 85
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1 and xx2 is not null)
+select * from (x left join y on (udf(x1) = y1)) left join x xx(xx1,xx2)
+on (udf(udf(x1)) = udf(xx1) and udf(udf(xx2)) is not null)
 -- !query 85 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 85 output
 -2299,8 +2299,8  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 86
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1) where (x2 is not null)
+select * from (x left join y on (udf(udf(x1)) = udf(udf(y1)))) left join x xx(xx1,xx2)
+on (udf(x1) = udf(xx1)) where (udf(x2) is not null)
 -- !query 86 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 86 output
 -2310,8 +2310,8  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 87
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1) where (y2 is not null)
+select * from (x left join y on (udf(x1) = udf(y1))) left join x xx(xx1,xx2)
+on (udf(x1) = xx1) where (udf(y2) is not null)
 -- !query 87 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 87 output
 -2321,8 +2321,8  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 88
-select * from (x left join y on (x1 = y1)) left join x xx(xx1,xx2)
-on (x1 = xx1) where (xx2 is not null)
+select * from (x left join y on (udf(x1) = udf(y1))) left join x xx(xx1,xx2)
+on (x1 = udf(xx1)) where (xx2 is not null)
 -- !query 88 schema
 struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>
 -- !query 88 output
 -2332,75 +2332,75  struct<x1:int,x2:int,y1:int,y2:int,xx1:int,xx2:int>

 -- !query 89
-select count(*) from tenk1 a where unique1 in
-  (select unique1 from tenk1 b join tenk1 c using (unique1)
-   where b.unique2 = 42)
+select udf(udf(count(*))) from tenk1 a where udf(udf(unique1)) in
+  (select udf(unique1) from tenk1 b join tenk1 c using (unique1)
+   where udf(udf(b.unique2)) = udf(42))
 -- !query 89 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(cast(udf(cast(count(1) as string)) as bigint) as string)) AS BIGINT):bigint>
 -- !query 89 output
 1

 -- !query 90
-select count(*) from tenk1 x where
-  x.unique1 in (select a.f1 from int4_tbl a,float8_tbl b where a.f1=b.f1) and
-  x.unique1 = 0 and
-  x.unique1 in (select aa.f1 from int4_tbl aa,float8_tbl bb where aa.f1=bb.f1)
+select udf(count(*)) from tenk1 x where
+  udf(x.unique1) in (select udf(a.f1) from int4_tbl a,float8_tbl b where udf(udf(a.f1))=b.f1) and
+  udf(x.unique1) = 0 and
+  udf(x.unique1) in (select aa.f1 from int4_tbl aa,float8_tbl bb where aa.f1=udf(udf(bb.f1)))
 -- !query 90 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 90 output
 1

 -- !query 91
-select count(*) from tenk1 x where
-  x.unique1 in (select a.f1 from int4_tbl a,float8_tbl b where a.f1=b.f1) and
-  x.unique1 = 0 and
-  x.unique1 in (select aa.f1 from int4_tbl aa,float8_tbl bb where aa.f1=bb.f1)
+select udf(udf(count(*))) from tenk1 x where
+  udf(x.unique1) in (select udf(a.f1) from int4_tbl a,float8_tbl b where udf(udf(a.f1))=b.f1) and
+  udf(x.unique1) = 0 and
+  udf(udf(x.unique1)) in (select udf(aa.f1) from int4_tbl aa,float8_tbl bb where udf(aa.f1)=udf(udf(bb.f1)))
 -- !query 91 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(cast(udf(cast(count(1) as string)) as bigint) as string)) AS BIGINT):bigint>
 -- !query 91 output
 1

 -- !query 92
 select * from int8_tbl i1 left join (int8_tbl i2 join
-  (select 123 as x) ss on i2.q1 = x) on i1.q2 = i2.q2
-order by 1, 2
+  (select udf(123) as x) ss on udf(udf(i2.q1)) = udf(x)) on udf(udf(i1.q2)) = udf(udf(i2.q2))
+order by udf(udf(1)), 2
 -- !query 92 schema
 struct<q1:bigint,q2:bigint,q1:bigint,q2:bigint,x:int>
 -- !query 92 output
-123    456     123     456     123
-123    4567890123456789        123     4567890123456789        123
 4567890123456789       -4567890123456789       NULL    NULL    NULL
 4567890123456789       123     NULL    NULL    NULL
+123    456     123     456     123
+123    4567890123456789        123     4567890123456789        123
 4567890123456789       4567890123456789        123     4567890123456789        123

 -- !query 93
-select count(*)
+select udf(count(*))
 from
-  (select t3.tenthous as x1, coalesce(t1.stringu1, t2.stringu1) as x2
+  (select udf(t3.tenthous) as x1, udf(coalesce(udf(t1.stringu1), udf(t2.stringu1))) as x2
    from tenk1 t1
-   left join tenk1 t2 on t1.unique1 = t2.unique1
-   join tenk1 t3 on t1.unique2 = t3.unique2) ss,
+   left join tenk1 t2 on udf(t1.unique1) = udf(t2.unique1)
+   join tenk1 t3 on t1.unique2 = udf(t3.unique2)) ss,
   tenk1 t4,
   tenk1 t5
-where t4.thousand = t5.unique1 and ss.x1 = t4.tenthous and ss.x2 = t5.stringu1
+where udf(t4.thousand) = udf(t5.unique1) and udf(udf(ss.x1)) = t4.tenthous and udf(ss.x2) = udf(udf(t5.stringu1))
 -- !query 93 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 93 output
 1000

 -- !query 94
-select a.f1, b.f1, t.thousand, t.tenthous from
+select udf(a.f1), udf(b.f1), udf(t.thousand), udf(t.tenthous) from
   tenk1 t,
-  (select sum(f1)+1 as f1 from int4_tbl i4a) a,
-  (select sum(f1) as f1 from int4_tbl i4b) b
-where b.f1 = t.thousand and a.f1 = b.f1 and (a.f1+b.f1+999) = t.tenthous
+  (select udf(udf(sum(udf(f1))+1)) as f1 from int4_tbl i4a) a,
+  (select udf(sum(udf(f1))) as f1 from int4_tbl i4b) b
+where b.f1 = udf(t.thousand) and udf(a.f1) = udf(b.f1) and udf((udf(a.f1)+udf(b.f1)+999)) = udf(udf(t.tenthous))
 -- !query 94 schema
-struct<f1:bigint,f1:bigint,thousand:int,tenthous:int>
+struct<CAST(udf(cast(f1 as string)) AS BIGINT):bigint,CAST(udf(cast(f1 as string)) AS BIGINT):bigint,CAST(udf(cast(thousand as string)) AS INT):int,CAST(udf(cast(tenthous as string)) AS INT):int>
 -- !query 94 output

 -2408,8 +2408,8  struct<f1:bigint,f1:bigint,thousand:int,tenthous:int>
 -- !query 95
 select * from
   j1_tbl full join
-  (select * from j2_tbl order by j2_tbl.i desc, j2_tbl.k asc) j2_tbl
-  on j1_tbl.i = j2_tbl.i and j1_tbl.i = j2_tbl.k
+  (select * from j2_tbl order by udf(udf(j2_tbl.i)) desc, udf(j2_tbl.k) asc) j2_tbl
+  on udf(j1_tbl.i) = udf(j2_tbl.i) and udf(j1_tbl.i) = udf(j2_tbl.k)
 -- !query 95 schema
 struct<i:int,j:int,t:string,i:int,k:int>
 -- !query 95 output
 -2435,13 +2435,13  NULL    NULL    null    NULL    NULL

 -- !query 96
-select count(*) from
-  (select * from tenk1 x order by x.thousand, x.twothousand, x.fivethous) x
+select udf(count(*)) from
+  (select * from tenk1 x order by udf(x.thousand), udf(udf(x.twothousand)), x.fivethous) x
   left join
-  (select * from tenk1 y order by y.unique2) y
-  on x.thousand = y.unique2 and x.twothousand = y.hundred and x.fivethous = y.unique2
+  (select * from tenk1 y order by udf(y.unique2)) y
+  on udf(x.thousand) = y.unique2 and x.twothousand = udf(y.hundred) and x.fivethous = y.unique2
 -- !query 96 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 96 output
 10000

 -2507,7 +2507,7  struct<>

 -- !query 104
-select tt1.*, tt2.* from tt1 left join tt2 on tt1.joincol = tt2.joincol
+select tt1.*, tt2.* from tt1 left join tt2 on udf(udf(tt1.joincol)) = udf(tt2.joincol)
 -- !query 104 schema
 struct<tt1_id:int,joincol:int,tt2_id:int,joincol:int>
 -- !query 104 output
 -2517,7 +2517,7  struct<tt1_id:int,joincol:int,tt2_id:int,joincol:int>

 -- !query 105
-select tt1.*, tt2.* from tt2 right join tt1 on tt1.joincol = tt2.joincol
+select tt1.*, tt2.* from tt2 right join tt1 on udf(udf(tt1.joincol)) = udf(udf(tt2.joincol))
 -- !query 105 schema
 struct<tt1_id:int,joincol:int,tt2_id:int,joincol:int>
 -- !query 105 output
 -2527,10 +2527,10  struct<tt1_id:int,joincol:int,tt2_id:int,joincol:int>

 -- !query 106
-select count(*) from tenk1 a, tenk1 b
-  where a.hundred = b.thousand and (b.fivethous % 10) < 10
+select udf(count(*)) from tenk1 a, tenk1 b
+  where udf(a.hundred) = b.thousand and udf(udf((b.fivethous % 10)) < 10)
 -- !query 106 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 106 output
 100000

 -2584,14 +2584,14  struct<>

 -- !query 113
-SELECT a.f1
+SELECT udf(udf(a.f1)) as f1
 FROM tt4 a
 LEFT JOIN (
         SELECT b.f1
-        FROM tt3 b LEFT JOIN tt3 c ON (b.f1 = c.f1)
-        WHERE c.f1 IS NULL
-) AS d ON (a.f1 = d.f1)
-WHERE d.f1 IS NULL
+        FROM tt3 b LEFT JOIN tt3 c ON udf(b.f1) = udf(c.f1)
+        WHERE udf(c.f1) IS NULL
+) AS d ON udf(a.f1) = d.f1
+WHERE udf(udf(d.f1)) IS NULL
 -- !query 113 schema
 struct<f1:int>
 -- !query 113 output
 -2621,7 +2621,7  struct<>

 -- !query 116
-select * from tt5,tt6 where tt5.f1 = tt6.f1 and tt5.f1 = tt5.f2 - tt6.f2
+select * from tt5,tt6 where udf(tt5.f1) = udf(tt6.f1) and udf(tt5.f1) = udf(udf(tt5.f2) - udf(tt6.f2))
 -- !query 116 schema
 struct<f1:int,f2:int,f1:int,f2:int>
 -- !query 116 output
 -2649,12 +2649,12  struct<>

 -- !query 119
-select yy.pkyy as yy_pkyy, yy.pkxx as yy_pkxx, yya.pkyy as yya_pkyy,
-       xxa.pkxx as xxa_pkxx, xxb.pkxx as xxb_pkxx
+select udf(udf(yy.pkyy)) as yy_pkyy, udf(yy.pkxx) as yy_pkxx, udf(yya.pkyy) as yya_pkyy,
+       udf(xxa.pkxx) as xxa_pkxx, udf(xxb.pkxx) as xxb_pkxx
 from yy
-     left join (SELECT * FROM yy where pkyy = 101) as yya ON yy.pkyy = yya.pkyy
-     left join xx xxa on yya.pkxx = xxa.pkxx
-     left join xx xxb on coalesce (xxa.pkxx, 1) = xxb.pkxx
+     left join (SELECT * FROM yy where pkyy = 101) as yya ON udf(yy.pkyy) = udf(yya.pkyy)
+     left join xx xxa on udf(yya.pkxx) = udf(udf(xxa.pkxx))
+     left join xx xxb on udf(udf(coalesce (xxa.pkxx, 1))) = udf(xxb.pkxx)
 -- !query 119 schema
 struct<yy_pkyy:int,yy_pkxx:int,yya_pkyy:int,xxa_pkxx:int,xxb_pkxx:int>
 -- !query 119 output
 -2693,9 +2693,9  struct<>

 -- !query 123
 select * from
-  zt2 left join zt3 on (f2 = f3)
-      left join zt1 on (f3 = f1)
-where f2 = 53
+  zt2 left join zt3 on (udf(f2) = udf(udf(f3)))
+      left join zt1 on (udf(udf(f3)) = udf(f1))
+where udf(f2) = 53
 -- !query 123 schema
 struct<f2:int,f3:int,f1:int>
 -- !query 123 output
 -2712,9 +2712,9  struct<>

 -- !query 125
 select * from
-  zt2 left join zt3 on (f2 = f3)
-      left join zv1 on (f3 = f1)
-where f2 = 53
+  zt2 left join zt3 on (f2 = udf(f3))
+      left join zv1 on (udf(f3) = f1)
+where udf(udf(f2)) = 53
 -- !query 125 schema
 struct<f2:int,f3:int,f1:int,junk:string>
 -- !query 125 output
 -2722,12 +2722,12  struct<f2:int,f3:int,f1:int,junk:string>

 -- !query 126
-select a.unique2, a.ten, b.tenthous, b.unique2, b.hundred
-from tenk1 a left join tenk1 b on a.unique2 = b.tenthous
-where a.unique1 = 42 and
-      ((b.unique2 is null and a.ten = 2) or b.hundred = 3)
+select udf(a.unique2), udf(a.ten), udf(b.tenthous), udf(b.unique2), udf(b.hundred)
+from tenk1 a left join tenk1 b on a.unique2 = udf(b.tenthous)
+where udf(a.unique1) = 42 and
+      ((udf(b.unique2) is null and udf(a.ten) = 2) or udf(udf(b.hundred)) = udf(udf(3)))
 -- !query 126 schema
-struct<unique2:int,ten:int,tenthous:int,unique2:int,hundred:int>
+struct<CAST(udf(cast(unique2 as string)) AS INT):int,CAST(udf(cast(ten as string)) AS INT):int,CAST(udf(cast(tenthous as string)) AS INT):int,CAST(udf(cast(unique2 as string)) AS INT):int,CAST(udf(cast(hundred as string)) AS INT):int>
 -- !query 126 output

 -2749,7 +2749,7  struct<>

 -- !query 129
-select * from a left join b on i = x and i = y and x = i
+select * from a left join b on udf(i) = x and i = udf(y) and udf(x) = udf(i)
 -- !query 129 schema
 struct<i:int,x:int,y:int>
 -- !query 129 output
 -2757,11 +2757,11  struct<i:int,x:int,y:int>

 -- !query 130
-select t1.q2, count(t2.*)
-from int8_tbl t1 left join int8_tbl t2 on (t1.q2 = t2.q1)
-group by t1.q2 order by 1
+select udf(t1.q2), udf(count(t2.*))
+from int8_tbl t1 left join int8_tbl t2 on (udf(udf(t1.q2)) = t2.q1)
+group by udf(t1.q2) order by 1
 -- !query 130 schema
-struct<q2:bigint,count(q1, q2):bigint>
+struct<CAST(udf(cast(q2 as string)) AS BIGINT):bigint,CAST(udf(cast(count(q1, q2) as string)) AS BIGINT):bigint>
 -- !query 130 output
 -4567890123456789      0
 123    2
 -2770,11 +2770,11  struct<q2:bigint,count(q1, q2):bigint>

 -- !query 131
-select t1.q2, count(t2.*)
-from int8_tbl t1 left join (select * from int8_tbl) t2 on (t1.q2 = t2.q1)
-group by t1.q2 order by 1
+select udf(udf(t1.q2)), udf(count(t2.*))
+from int8_tbl t1 left join (select * from int8_tbl) t2 on (udf(udf(t1.q2)) = udf(t2.q1))
+group by udf(udf(t1.q2)) order by 1
 -- !query 131 schema
-struct<q2:bigint,count(q1, q2):bigint>
+struct<CAST(udf(cast(cast(udf(cast(q2 as string)) as bigint) as string)) AS BIGINT):bigint,CAST(udf(cast(count(q1, q2) as string)) AS BIGINT):bigint>
 -- !query 131 output
 -4567890123456789      0
 123    2
 -2783,13 +2783,13  struct<q2:bigint,count(q1, q2):bigint>

 -- !query 132
-select t1.q2, count(t2.*)
+select udf(t1.q2) as q2, udf(udf(count(t2.*)))
 from int8_tbl t1 left join
-  (select q1, case when q2=1 then 1 else q2 end as q2 from int8_tbl) t2
-  on (t1.q2 = t2.q1)
+  (select udf(q1) as q1, case when q2=1 then 1 else q2 end as q2 from int8_tbl) t2
+  on (udf(t1.q2) = udf(t2.q1))
 group by t1.q2 order by 1
 -- !query 132 schema
-struct<q2:bigint,count(q1, q2):bigint>
+struct<q2:bigint,CAST(udf(cast(cast(udf(cast(count(q1, q2) as string)) as bigint) as string)) AS BIGINT):bigint>
 -- !query 132 output
 -4567890123456789      0
 123    2
 -2828,17 +2828,17  struct<>

 -- !query 136
-select c.name, ss.code, ss.b_cnt, ss.const
+select udf(c.name), udf(ss.code), udf(ss.b_cnt), udf(ss.const)
 from c left join
   (select a.code, coalesce(b_grp.cnt, 0) as b_cnt, -1 as const
    from a left join
-     (select count(1) as cnt, b.a from b group by b.a) as b_grp
-     on a.code = b_grp.a
+     (select udf(count(1)) as cnt, b.a as a from b group by b.a) as b_grp
+     on udf(a.code) = udf(udf(b_grp.a))
   ) as ss
-  on (c.a = ss.code)
+  on (udf(udf(c.a)) = udf(ss.code))
 order by c.name
 -- !query 136 schema
-struct<name:string,code:string,b_cnt:bigint,const:int>
+struct<CAST(udf(cast(name as string)) AS STRING):string,CAST(udf(cast(code as string)) AS STRING):string,CAST(udf(cast(b_cnt as string)) AS BIGINT):bigint,CAST(udf(cast(const as string)) AS INT):int>
 -- !query 136 output
 A      p       2       -1
 B      q       0       -1
 -2852,15 +2852,15  LEFT JOIN
 ( SELECT sub3.key3, sub4.value2, COALESCE(sub4.value2, 66) as value3 FROM
     ( SELECT 1 as key3 ) sub3
     LEFT JOIN
-    ( SELECT sub5.key5, COALESCE(sub6.value1, 1) as value2 FROM
+    ( SELECT udf(sub5.key5) as key5, udf(udf(COALESCE(sub6.value1, 1))) as value2 FROM
         ( SELECT 1 as key5 ) sub5
         LEFT JOIN
         ( SELECT 2 as key6, 42 as value1 ) sub6
-        ON sub5.key5 = sub6.key6
+        ON sub5.key5 = udf(sub6.key6)
     ) sub4
-    ON sub4.key5 = sub3.key3
+    ON udf(sub4.key5) = sub3.key3
 ) sub2
-ON sub1.key1 = sub2.key3
+ON udf(udf(sub1.key1)) = udf(udf(sub2.key3))
 -- !query 137 schema
 struct<key1:int,key3:int,value2:int,value3:int>
 -- !query 137 output
 -2871,34 +2871,34  struct<key1:int,key3:int,value2:int,value3:int>
 SELECT * FROM
 ( SELECT 1 as key1 ) sub1
 LEFT JOIN
-( SELECT sub3.key3, value2, COALESCE(value2, 66) as value3 FROM
+( SELECT udf(sub3.key3) as key3, udf(value2), udf(COALESCE(value2, 66)) as value3 FROM
     ( SELECT 1 as key3 ) sub3
     LEFT JOIN
     ( SELECT sub5.key5, COALESCE(sub6.value1, 1) as value2 FROM
         ( SELECT 1 as key5 ) sub5
         LEFT JOIN
         ( SELECT 2 as key6, 42 as value1 ) sub6
-        ON sub5.key5 = sub6.key6
+        ON udf(udf(sub5.key5)) = sub6.key6
     ) sub4
     ON sub4.key5 = sub3.key3
 ) sub2
-ON sub1.key1 = sub2.key3
+ON sub1.key1 = udf(udf(sub2.key3))
 -- !query 138 schema
-struct<key1:int,key3:int,value2:int,value3:int>
+struct<key1:int,key3:int,CAST(udf(cast(value2 as string)) AS INT):int,value3:int>
 -- !query 138 output
 1      1       1       1

 -- !query 139
-SELECT qq, unique1
+SELECT udf(qq), udf(udf(unique1))
   FROM
-  ( SELECT COALESCE(q1, 0) AS qq FROM int8_tbl a ) AS ss1
+  ( SELECT udf(COALESCE(q1, 0)) AS qq FROM int8_tbl a ) AS ss1
   FULL OUTER JOIN
-  ( SELECT COALESCE(q2, -1) AS qq FROM int8_tbl b ) AS ss2
+  ( SELECT udf(udf(COALESCE(q2, -1))) AS qq FROM int8_tbl b ) AS ss2
   USING (qq)
-  INNER JOIN tenk1 c ON qq = unique2
+  INNER JOIN tenk1 c ON udf(qq) = udf(unique2)
 -- !query 139 schema
-struct<qq:bigint,unique1:int>
+struct<CAST(udf(cast(qq as string)) AS BIGINT):bigint,CAST(udf(cast(cast(udf(cast(unique1 as string)) as int) as string)) AS INT):int>
 -- !query 139 output
 123    4596
 123    4596
 -2936,19 +2936,19  struct<>

 -- !query 143
-select nt3.id
+select udf(nt3.id)
 from nt3 as nt3
   left join
-    (select nt2.*, (nt2.b1 and ss1.a3) AS b3
+    (select nt2.*, (udf(nt2.b1) and udf(ss1.a3)) AS b3
      from nt2 as nt2
        left join
-         (select nt1.*, (nt1.id is not null) as a3 from nt1) as ss1
-         on ss1.id = nt2.nt1_id
+         (select nt1.*, (udf(nt1.id) is not null) as a3 from nt1) as ss1
+         on ss1.id = udf(udf(nt2.nt1_id))
     ) as ss2
-    on ss2.id = nt3.nt2_id
-where nt3.id = 1 and ss2.b3
+    on udf(ss2.id) = nt3.nt2_id
+where udf(nt3.id) = 1 and udf(ss2.b3)
 -- !query 143 schema
-struct<id:int>
+struct<CAST(udf(cast(id as string)) AS INT):int>
 -- !query 143 output
 1

 -3003,73 +3003,73  NULL    2147483647

 -- !query 146
-select count(*) from
-  tenk1 a join tenk1 b on a.unique1 = b.unique2
-  left join tenk1 c on a.unique2 = b.unique1 and c.thousand = a.thousand
-  join int4_tbl on b.thousand = f1
+select udf(count(*)) from
+  tenk1 a join tenk1 b on udf(a.unique1) = udf(b.unique2)
+  left join tenk1 c on udf(a.unique2) = udf(b.unique1) and udf(c.thousand) = udf(udf(a.thousand))
+  join int4_tbl on udf(b.thousand) = f1
 -- !query 146 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 146 output
 10

 -- !query 147
-select b.unique1 from
-  tenk1 a join tenk1 b on a.unique1 = b.unique2
-  left join tenk1 c on b.unique1 = 42 and c.thousand = a.thousand
-  join int4_tbl i1 on b.thousand = f1
-  right join int4_tbl i2 on i2.f1 = b.tenthous
-  order by 1
+select udf(b.unique1) from
+  tenk1 a join tenk1 b on udf(a.unique1) = udf(b.unique2)
+  left join tenk1 c on udf(b.unique1) = 42 and c.thousand = udf(a.thousand)
+  join int4_tbl i1 on udf(b.thousand) = udf(udf(f1))
+  right join int4_tbl i2 on udf(udf(i2.f1)) = udf(b.tenthous)
+  order by udf(1)
 -- !query 147 schema
-struct<unique1:int>
+struct<CAST(udf(cast(unique1 as string)) AS INT):int>
 -- !query 147 output
 NULL
 NULL
+0
 NULL
 NULL
-0

 -- !query 148
 select * from
 (
-  select unique1, q1, coalesce(unique1, -1) + q1 as fault
-  from int8_tbl left join tenk1 on (q2 = unique2)
+  select udf(unique1), udf(q1), udf(udf(coalesce(unique1, -1)) + udf(q1)) as fault
+  from int8_tbl left join tenk1 on (udf(q2) = udf(unique2))
 ) ss
-where fault = 122
-order by fault
+where udf(fault) = udf(122)
+order by udf(fault)
 -- !query 148 schema
-struct<unique1:int,q1:bigint,fault:bigint>
+struct<CAST(udf(cast(unique1 as string)) AS INT):int,CAST(udf(cast(q1 as string)) AS BIGINT):bigint,fault:bigint>
 -- !query 148 output
 NULL   123     122

 -- !query 149
-select q1, unique2, thousand, hundred
-  from int8_tbl a left join tenk1 b on q1 = unique2
-  where coalesce(thousand,123) = q1 and q1 = coalesce(hundred,123)
+select udf(q1), udf(unique2), udf(thousand), udf(hundred)
+  from int8_tbl a left join tenk1 b on udf(q1) = udf(unique2)
+  where udf(coalesce(thousand,123)) = udf(q1) and udf(q1) = udf(udf(coalesce(hundred,123)))
 -- !query 149 schema
-struct<q1:bigint,unique2:int,thousand:int,hundred:int>
+struct<CAST(udf(cast(q1 as string)) AS BIGINT):bigint,CAST(udf(cast(unique2 as string)) AS INT):int,CAST(udf(cast(thousand as string)) AS INT):int,CAST(udf(cast(hundred as string)) AS INT):int>
 -- !query 149 output

 -- !query 150
-select f1, unique2, case when unique2 is null then f1 else 0 end
-  from int4_tbl a left join tenk1 b on f1 = unique2
-  where (case when unique2 is null then f1 else 0 end) = 0
+select udf(f1), udf(unique2), case when udf(udf(unique2)) is null then udf(f1) else 0 end
+  from int4_tbl a left join tenk1 b on udf(f1) = udf(udf(unique2))
+  where (case when udf(unique2) is null then udf(f1) else 0 end) = 0
 -- !query 150 schema
-struct<f1:int,unique2:int,CASE WHEN (unique2 IS NULL) THEN f1 ELSE 0 END:int>
+struct<CAST(udf(cast(f1 as string)) AS INT):int,CAST(udf(cast(unique2 as string)) AS INT):int,CASE WHEN (CAST(udf(cast(cast(udf(cast(unique2 as string)) as int) as string)) AS INT) IS NULL) THEN CAST(udf(cast(f1 as string)) AS INT) ELSE 0 END:int>
 -- !query 150 output
 0      0       0

 -- !query 151
-select a.unique1, b.unique1, c.unique1, coalesce(b.twothousand, a.twothousand)
-  from tenk1 a left join tenk1 b on b.thousand = a.unique1                        left join tenk1 c on c.unique2 = coalesce(b.twothousand, a.twothousand)
-  where a.unique2 < 10 and coalesce(b.twothousand, a.twothousand) = 44
+select udf(a.unique1), udf(b.unique1), udf(c.unique1), udf(coalesce(b.twothousand, a.twothousand))
+  from tenk1 a left join tenk1 b on udf(b.thousand) = a.unique1                       left join tenk1 c on udf(c.unique2) = udf(coalesce(b.twothousand, a.twothousand))
+  where a.unique2 < udf(10) and udf(udf(coalesce(b.twothousand, a.twothousand))) = udf(44)
 -- !query 151 schema
-struct<unique1:int,unique1:int,unique1:int,coalesce(twothousand, twothousand):int>
+struct<CAST(udf(cast(unique1 as string)) AS INT):int,CAST(udf(cast(unique1 as string)) AS INT):int,CAST(udf(cast(unique1 as string)) AS INT):int,CAST(udf(cast(coalesce(twothousand, twothousand) as string)) AS INT):int>
 -- !query 151 output

 -3078,11 +3078,11  struct<unique1:int,unique1:int,unique1:int,coalesce(twothousand, twothousand):in
 select * from
   text_tbl t1
   inner join int8_tbl i8
-  on i8.q2 = 456
+  on udf(i8.q2) = udf(udf(456))
   right join text_tbl t2
-  on t1.f1 = 'doh!'
+  on udf(t1.f1) = udf(udf('doh!'))
   left join int4_tbl i4
-  on i8.q1 = i4.f1
+  on udf(udf(i8.q1)) = i4.f1
 -- !query 152 schema
 struct<f1:string,q1:bigint,q2:bigint,f1:string,f1:int>
 -- !query 152 output
 -3092,10 +3092,10  doh!    123     456     hi de ho neighbor       NULL

 -- !query 153
 select * from
-  (select 1 as id) as xx
+  (select udf(udf(1)) as id) as xx
   left join
-    (tenk1 as a1 full join (select 1 as id) as yy on (a1.unique1 = yy.id))
-  on (xx.id = coalesce(yy.id))
+    (tenk1 as a1 full join (select udf(1) as id) as yy on (udf(a1.unique1) = udf(yy.id)))
+  on (xx.id = udf(udf(coalesce(yy.id))))
 -- !query 153 schema
 struct<id:int,unique1:int,unique2:int,two:int,four:int,ten:int,twenty:int,hundred:int,thousand:int,twothousand:int,fivethous:int,tenthous:int,odd:int,even:int,stringu1:string,stringu2:string,string4:string,id:int>
 -- !query 153 output
 -3103,11 +3103,11  struct<id:int,unique1:int,unique2:int,two:int,four:int,ten:int,twenty:int,hundre

 -- !query 154
-select a.q2, b.q1
-  from int8_tbl a left join int8_tbl b on a.q2 = coalesce(b.q1, 1)
-  where coalesce(b.q1, 1) > 0
+select udf(a.q2), udf(b.q1)
+  from int8_tbl a left join int8_tbl b on udf(a.q2) = coalesce(b.q1, 1)
+  where udf(udf(coalesce(b.q1, 1)) > 0)
 -- !query 154 schema
-struct<q2:bigint,q1:bigint>
+struct<CAST(udf(cast(q2 as string)) AS BIGINT):bigint,CAST(udf(cast(q1 as string)) AS BIGINT):bigint>
 -- !query 154 output
 -4567890123456789      NULL
 123    123
 -3142,7 +3142,7  struct<>

 -- !query 157
-select p.* from parent p left join child c on (p.k = c.k)
+select p.* from parent p left join child c on (udf(p.k) = udf(c.k))
 -- !query 157 schema
 struct<k:int,pd:int>
 -- !query 157 output
 -3153,8 +3153,8  struct<k:int,pd:int>

 -- !query 158
 select p.*, linked from parent p
-  left join (select c.*, true as linked from child c) as ss
-  on (p.k = ss.k)
+  left join (select c.*, udf(udf(true)) as linked from child c) as ss
+  on (udf(p.k) = udf(udf(ss.k)))
 -- !query 158 schema
 struct<k:int,pd:int,linked:boolean>
 -- !query 158 output
 -3165,8 +3165,8  struct<k:int,pd:int,linked:boolean>

 -- !query 159
 select p.* from
-  parent p left join child c on (p.k = c.k)
-  where p.k = 1 and p.k = 2
+  parent p left join child c on (udf(p.k) = c.k)
+  where p.k = udf(1) and udf(udf(p.k)) = udf(udf(2))
 -- !query 159 schema
 struct<k:int,pd:int>
 -- !query 159 output
 -3175,8 +3175,8  struct<k:int,pd:int>

 -- !query 160
 select p.* from
-  (parent p left join child c on (p.k = c.k)) join parent x on p.k = x.k
-  where p.k = 1 and p.k = 2
+  (parent p left join child c on (udf(p.k) = c.k)) join parent x on p.k = udf(x.k)
+  where udf(p.k) = udf(1) and udf(udf(p.k)) = udf(udf(2))
 -- !query 160 schema
 struct<k:int,pd:int>
 -- !query 160 output
 -3204,7 +3204,7  struct<>

 -- !query 163
-SELECT * FROM b LEFT JOIN a ON (b.a_id = a.id) WHERE (a.id IS NULL OR a.id > 0)
+SELECT * FROM b LEFT JOIN a ON (udf(b.a_id) = udf(a.id)) WHERE (udf(udf(a.id)) IS NULL OR udf(a.id) > 0)
 -- !query 163 schema
 struct<id:int,a_id:int,id:int>
 -- !query 163 output
 -3212,7 +3212,7  struct<id:int,a_id:int,id:int>

 -- !query 164
-SELECT b.* FROM b LEFT JOIN a ON (b.a_id = a.id) WHERE (a.id IS NULL OR a.id > 0)
+SELECT b.* FROM b LEFT JOIN a ON (udf(b.a_id) = udf(a.id)) WHERE (udf(a.id) IS NULL OR udf(udf(a.id)) > 0)
 -- !query 164 schema
 struct<id:int,a_id:int>
 -- !query 164 output
 -3231,13 +3231,13  struct<>

 -- !query 166
 SELECT * FROM
-    (SELECT 1 AS x) ss1
+    (SELECT udf(1) AS x) ss1
   LEFT JOIN
-    (SELECT q1, q2, COALESCE(dat1, q1) AS y
-     FROM int8_tbl LEFT JOIN innertab ON q2 = id) ss2
+    (SELECT udf(q1), udf(q2), udf(COALESCE(dat1, q1)) AS y
+     FROM int8_tbl LEFT JOIN innertab ON udf(udf(q2)) = id) ss2
   ON true
 -- !query 166 schema
-struct<x:int,q1:bigint,q2:bigint,y:bigint>
+struct<x:int,CAST(udf(cast(q1 as string)) AS BIGINT):bigint,CAST(udf(cast(q2 as string)) AS BIGINT):bigint,y:bigint>
 -- !query 166 output
 1      123     456     123
 1      123     4567890123456789        123
 -3248,27 +3248,27  struct<x:int,q1:bigint,q2:bigint,y:bigint>

 -- !query 167
 select * from
-  int8_tbl x join (int4_tbl x cross join int4_tbl y) j on q1 = f1
+  int8_tbl x join (int4_tbl x cross join int4_tbl y) j on udf(q1) = udf(f1)
 -- !query 167 schema
 struct<>
 -- !query 167 output
 org.apache.spark.sql.AnalysisException
-Reference 'f1' is ambiguous, could be: j.f1, j.f1.; line 2 pos 63
+Reference 'f1' is ambiguous, could be: j.f1, j.f1.; line 2 pos 72

 -- !query 168
 select * from
-  int8_tbl x join (int4_tbl x cross join int4_tbl y) j on q1 = y.f1
+  int8_tbl x join (int4_tbl x cross join int4_tbl y) j on udf(q1) = udf(y.f1)
 -- !query 168 schema
 struct<>
 -- !query 168 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`y.f1`' given input columns: [j.f1, j.f1, x.q1, x.q2]; line 2 pos 63
+cannot resolve '`y.f1`' given input columns: [j.f1, j.f1, x.q1, x.q2]; line 2 pos 72

 -- !query 169
 select * from
-  int8_tbl x join (int4_tbl x cross join int4_tbl y(ff)) j on q1 = f1
+  int8_tbl x join (int4_tbl x cross join int4_tbl y(ff)) j on udf(q1) = udf(udf(f1))
 -- !query 169 schema
 struct<q1:bigint,q2:bigint,f1:int,ff:int>
 -- !query 169 output
 -3276,69 +3276,69  struct<q1:bigint,q2:bigint,f1:int,ff:int>

 -- !query 170
-select t1.uunique1 from
-  tenk1 t1 join tenk2 t2 on t1.two = t2.two
+select udf(t1.uunique1) from
+  tenk1 t1 join tenk2 t2 on t1.two = udf(t2.two)
 -- !query 170 schema
 struct<>
 -- !query 170 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`t1.uunique1`' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
+cannot resolve '`t1.uunique1`' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 11

 -- !query 171
-select t2.uunique1 from
-  tenk1 t1 join tenk2 t2 on t1.two = t2.two
+select udf(udf(t2.uunique1)) from
+  tenk1 t1 join tenk2 t2 on udf(t1.two) = t2.two
 -- !query 171 schema
 struct<>
 -- !query 171 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`t2.uunique1`' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
+cannot resolve '`t2.uunique1`' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 15

 -- !query 172
-select uunique1 from
-  tenk1 t1 join tenk2 t2 on t1.two = t2.two
+select udf(uunique1) from
+  tenk1 t1 join tenk2 t2 on udf(t1.two) = udf(t2.two)
 -- !query 172 schema
 struct<>
 -- !query 172 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`uunique1`' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
+cannot resolve '`uunique1`' given input columns: [t1.even, t2.even, t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 11

 -- !query 173
-select f1,g from int4_tbl a, (select f1 as g) ss
+select udf(udf(f1,g)) from int4_tbl a, (select udf(udf(f1)) as g) ss
 -- !query 173 schema
 struct<>
 -- !query 173 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`f1`' given input columns: []; line 1 pos 37
+cannot resolve '`f1`' given input columns: []; line 1 pos 55

 -- !query 174
-select f1,g from int4_tbl a, (select a.f1 as g) ss
+select udf(f1,g) from int4_tbl a, (select a.f1 as g) ss
 -- !query 174 schema
 struct<>
 -- !query 174 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`a.f1`' given input columns: []; line 1 pos 37
+cannot resolve '`a.f1`' given input columns: []; line 1 pos 42

 -- !query 175
-select f1,g from int4_tbl a cross join (select f1 as g) ss
+select udf(udf(f1,g)) from int4_tbl a cross join (select udf(f1) as g) ss
 -- !query 175 schema
 struct<>
 -- !query 175 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`f1`' given input columns: []; line 1 pos 47
+cannot resolve '`f1`' given input columns: []; line 1 pos 61

 -- !query 176
-select f1,g from int4_tbl a cross join (select a.f1 as g) ss
+select udf(f1,g) from int4_tbl a cross join (select udf(udf(a.f1)) as g) ss
 -- !query 176 schema
 struct<>
 -- !query 176 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`a.f1`' given input columns: []; line 1 pos 47
+cannot resolve '`a.f1`' given input columns: []; line 1 pos 60

 -- !query 177
 -3383,8 +3383,8  struct<>

 -- !query 182
 select * from j1
-inner join j2 on j1.id1 = j2.id1 and j1.id2 = j2.id2
-where j1.id1 % 1000 = 1 and j2.id1 % 1000 = 1
+inner join j2 on udf(j1.id1) = udf(j2.id1) and udf(udf(j1.id2)) = udf(j2.id2)
+where udf(j1.id1) % 1000 = 1 and udf(udf(j2.id1) % 1000) = 1
 -- !query 182 schema
 struct<id1:int,id2:int,id1:int,id2:int>
 -- !query 182 output
```

</p>
</details>

## How was this patch tested?

Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

Closes #25371 from huaxingao/spark-28393.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-19 20:10:56 +09:00
Wenchen Fan 97dc4c0bfc [SPARK-28744][SQL][TEST] rename SharedSQLContext to SharedSparkSession
## What changes were proposed in this pull request?

The Spark SQL test framework needs to support 2 kinds of tests:
1. tests inside Spark to test Spark itself (extends `SparkFunSuite`)
2. test outside of Spark to test Spark applications (introduced at b57ed2245c)

The class hierarchy of the major testing traits:
![image](https://user-images.githubusercontent.com/3182036/63088526-c0f0af80-bf87-11e9-9bed-c144c2486da9.png)

`PlanTestBase`, `SQLTestUtilsBase` and `SharedSparkSession` intentionally don't extend `SparkFunSuite`, so that they can be used for tests outside of Spark. Tests in Spark should extends `QueryTest` and/or `SharedSQLContext` in most cases.

However, the name is a little confusing. As a result, some test suites extend `SharedSparkSession` instead of `SharedSQLContext`. `SharedSparkSession` doesn't work well with `SparkFunSuite` as it doesn't have the special handling of thread auditing in `SharedSQLContext`. For example, you will see a warning starting with `===== POSSIBLE THREAD LEAK IN SUITE` when you run `DataFrameSelfJoinSuite`.

This PR proposes to rename `SharedSparkSession` to `SharedSparkSessionBase`, and rename `SharedSQLContext` to `SharedSparkSession`.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25463 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-19 19:01:56 +08:00
Peter Toth f999e00e9f [SPARK-28356][SHUFFLE][FOLLOWUP] Fix case with different pre-shuffle partition numbers
### What changes were proposed in this pull request?

This PR reverts some of the latest changes in `ReduceNumShufflePartitions` to fix the case when there are different pre-shuffle partition numbers in the plan. Please see the new UT for an example.

### Why are the changes needed?
Eliminate a bug.

### Does this PR introduce any user-facing change?
Yes, some queries that failed will succeed now.

### How was this patch tested?
Added new UT.

Closes #25479 from peter-toth/SPARK-28356-followup.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-19 15:53:43 +08:00
Dilip Biswal a5df5ff0fd [SPARK-28734][DOC] Initial table of content in the left hand side bar for SQL doc
## What changes were proposed in this pull request?
This is a initial PR that creates the table of content for SQL reference guide. The left side bar will displays additional menu items corresponding to supported SQL constructs. One this PR is merged, we will fill in the content incrementally.  Additionally this PR contains a minor change to make the left sidebar scrollable. Currently it is not possible to scroll in the left hand side window.

## How was this patch tested?
Used jekyll build and serve to verify.

Closes #25459 from dilipbiswal/ref-doc.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-18 23:17:50 -07:00
Eyal Zituny d75a11d059 [SPARK-27330][SS] support task abort in foreach writer
## What changes were proposed in this pull request?
in order to address cases where foreach writer task is failing without calling the close() method, (for example when a task is interrupted) added the option to implement an abort() method that will be called when the task is aborted. users should handle resource cleanup (such as connections) in the abort() method

## How was this patch tested?
update existing unit tests.

Closes #24382 from eyalzit/SPARK-27330-foreach-writer-abort.

Lead-authored-by: Eyal Zituny <eyal.zituny@equalum.io>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: eyalzit <eyal.zituny@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-19 14:12:48 +08:00
shivusondur c96b6154b7 [SPARK-28390][SQL][PYTHON][TESTS][FOLLOW-UP] Update the TODO with actual blocking JIRA IDs
## What changes were proposed in this pull request?
 only todo message updated. Need to add udf() for GroupBy Tests, after resolving following jira
[SPARK-28386] and [SPARK-26741]

## How was this patch tested?
NA, only TODO message updated.

Closes #25415 from shivusondur/jiraFollowup.

Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-19 13:01:39 +09:00
WeichenXu 4ddad79060 [SPARK-28598][SQL] Few date time manipulation functions does not provide versions supporting Column as input through the Dataframe API
## What changes were proposed in this pull request?

Add following functions:
```
def add_months(startDate: Column, numMonths: Column): Column
def date_add(start: Column, days: Column): Column
def date_sub(start: Column, days: Column): Column
```

## How was this patch tested?

UT.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25334 from WeichenXu123/datefunc_impr.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-19 11:41:13 +09:00
Dongjoon Hyun f0834d3a7f Revert "[SPARK-28527][SQL][TEST] Re-run all the tests in SQLQueryTestSuite via Thrift Server"
This reverts commit efbb035902.
2019-08-18 16:54:24 -07:00
Yizhong Zhang c097c555ac [SPARK-21067][DOC] Fix Thrift Server - CTAS fail with Unable to move source
## What changes were proposed in this pull request?

This PR aims to fix CTAS fails after we closed a session of ThriftServer.

- sql-distributed-sql-engine.md
![image](https://user-images.githubusercontent.com/25916266/62509628-6f854980-b83e-11e9-9bea-daaf76c8f724.png)

It seems the simplest way to fix [[SPARK-21067]](https://issues.apache.org/jira/browse/SPARK-21067).

For example :
If we use HDFS, we can set the following property in hive-site.xml.
`<property>`
`  <name>fs.hdfs.impl.disable.cache</name>`
`  <value>true</value>`
`</property>`

## How was this patch tested

Manual.

Closes #25364 from Deegue/fix_add_doc_file_system.

Authored-by: Yizhong Zhang <zyzzxycj@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-18 15:55:43 -05:00
Yuming Wang c308ab5a29 [MINOR][SQL] Make analysis error msg more meaningful on DISTINCT queries
## What changes were proposed in this pull request?

This PR makes analysis error messages more meaningful when the function does not support the modifier DISTINCT:
```sql
postgres=# select upper(distinct a) from (values('a'), ('b')) v(a);
ERROR:  DISTINCT specified, but upper is not an aggregate function
LINE 1: select upper(distinct a) from (values('a'), ('b')) v(a);

spark-sql> select upper(distinct a) from (values('a'), ('b')) v(a);
Error in query: upper does not support the modifier DISTINCT; line 1 pos 7
spark-sql>
```

After this pr:
```sql
spark-sql> select upper(distinct a) from (values('a'), ('b')) v(a);
Error in query: DISTINCT specified, but upper is not an aggregate function; line 1 pos 7
spark-sql>

```

## How was this patch tested?

Unit test

Closes #25486 from wangyum/DISTINCT.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-18 08:36:01 -07:00
Yuming Wang efbb035902 [SPARK-28527][SQL][TEST] Re-run all the tests in SQLQueryTestSuite via Thrift Server
## What changes were proposed in this pull request?

This PR build a test framework that directly re-run all the tests in `SQLQueryTestSuite` via Thrift Server. But it's a little different from `SQLQueryTestSuite`:
1. Can not support [UDF testing](44e607e921/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala (L293-L297)).
2. Can not support `DESC` command and `SHOW` command because `SQLQueryTestSuite` [formatted the output](1882912cca/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala (L38-L50).).

When building this framework, found two bug:
[SPARK-28624](https://issues.apache.org/jira/browse/SPARK-28624): `make_date` is inconsistent when reading from table
[SPARK-28611](https://issues.apache.org/jira/browse/SPARK-28611): Histogram's height is different

found two features that ThriftServer can not support:
[SPARK-28636](https://issues.apache.org/jira/browse/SPARK-28636): ThriftServer can not support decimal type with negative scale
[SPARK-28637](https://issues.apache.org/jira/browse/SPARK-28637): ThriftServer can not support interval type

Also, found two inconsistent behavior:
[SPARK-28620](https://issues.apache.org/jira/browse/SPARK-28620): Double type returned for float type in Beeline/JDBC
[SPARK-28619](https://issues.apache.org/jira/browse/SPARK-28619):  The golden result file is different when tested by `bin/spark-sql`

## How was this patch tested?

N/A

Closes #25373 from wangyum/SPARK-28527.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-17 19:12:50 -07:00
Dongjoon Hyun 5756a47a9f [SPARK-28766][R][DOC] Fix CRAN incoming feasibility warning on invalid URL
### What changes were proposed in this pull request?

This updates an URL in R doc to fix `Had CRAN check errors; see logs`.

### Why are the changes needed?
Currently, this invalid link causes a warning during CRAN incoming feasibility. We had better fix this before submitting `3.0.0/2.4.4/2.3.4`.

**BEFORE**
```
* checking CRAN incoming feasibility ... NOTE
Maintainer: ‘Shivaram Venkataraman <shivaramcs.berkeley.edu>’

Found the following (possibly) invalid URLs:
  URL: https://wiki.apache.org/hadoop/HCFS (moved to https://cwiki.apache.org/confluence/display/hadoop/HCFS)
    From: man/spark.addFile.Rd
    Status: 404
    Message: Not Found
```

**AFTER**
```
* checking CRAN incoming feasibility ... Note_to_CRAN_maintainers
Maintainer: ‘Shivaram Venkataraman <shivaramcs.berkeley.edu>’
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Check the warning message during R testing.
```
$ R/install-dev.sh
$ R/run-tests.sh
```

Closes #25483 from dongjoon-hyun/SPARK-28766.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-17 11:11:36 -07:00
Dongjoon Hyun f7c9de9035 [SPARK-28765][BUILD] Add explict exclusions to avoid JDK11 dependency issue
### What changes were proposed in this pull request?

This PR adds explicit exclusions to avoid Maven `JDK11` dependency issues.

### Why are the changes needed?

Maven/Ivy seems to be confused during dependency generation on `JDK11` environment.
This is not only wrong, but also causes a Jenkins failure during dependency manifest check on `JDK11` environment.

**JDK8**
```
$ cd core
$ mvn -X dependency:tree -Dincludes=jakarta.activation:jakarta.activation-api
...
[DEBUG]       org.glassfish.jersey.core:jersey-server:jar:2.29:compile (version managed from 2.22.2)
[DEBUG]          org.glassfish.jersey.media:jersey-media-jaxb:jar:2.29:compile
[DEBUG]          javax.validation:validation-api:jar:2.0.1.Final:compile
```

**JDK11**
```
[DEBUG]       org.glassfish.jersey.core:jersey-server:jar:2.29:compile (version managed from 2.22.2)
[DEBUG]          org.glassfish.jersey.media:jersey-media-jaxb:jar:2.29:compile
[DEBUG]          javax.validation:validation-api:jar:2.0.1.Final:compile
[DEBUG]          jakarta.xml.bind:jakarta.xml.bind-api🫙2.3.2:compile
[DEBUG]             jakarta.activation:jakarta.activation-api🫙1.2.1:compile
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Do the following in both `JDK8` and `JDK11` environment. The dependency manifest should not be changed. In the current `master` branch, `JDK11` changes the dependency manifest.
```
$ dev/test-dependencies.sh --replace-manifest
```

Closes #25481 from dongjoon-hyun/SPARK-28765.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-17 10:16:22 -07:00
Dongjoon Hyun 1819a6f22e [SPARK-28759][BUILD] Upgrade scala-maven-plugin to 4.1.1
### What changes were proposed in this pull request?

This PR aims to upgrade `scala-maven-plugin` to 4.1.1 to bring the improvement (including Scala 2.13.0 support, Zinc update) and bug fixes in the plugin.

### Why are the changes needed?

`4.1.1` uses the latest dependent plugins.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins.

Closes #25476 from dongjoon-hyun/SPARK-28759.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-16 21:23:11 -07:00
Sean Owen c9b49f3978 [SPARK-28737][CORE] Update Jersey to 2.29
## What changes were proposed in this pull request?

Update Jersey to 2.27+, ideally 2.29, for possible JDK 11 fixes.

## How was this patch tested?

Existing tests.

Closes #25455 from srowen/SPARK-28737.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-16 15:08:04 -07:00
Dongjoon Hyun 43101c7328 [SPARK-28758][BUILD][SQL] Upgrade Janino to 3.0.15
### What changes were proposed in this pull request?

This PR aims to upgrade `Janino` from `3.0.13` to `3.0.15` in order to bring the bug fixes. Please note that `3.1.0` is a major refactoring instead of bug fixes. We had better use `3.0.15` and wait for the stabler 3.1.x.

### Why are the changes needed?

This brings the following bug fixes.

**3.0.15 (2019-07-28)**

- Fix overloaded single static method import

**3.0.14 (2019-07-05)**

- Conflict in sbt-assembly
- Overloaded static on-demand imported methods cause a CompileException: Ambiguous static method import
- Handle overloaded static on-demand imports
- Major refactoring of the Java 8 and Java 9 retrofit mechanism
- Added tests for "JLS8 8.6 Instance Initializers" and "JLS8 8.7 Static Initializers"
- Local variables in instance initializers don't work
- Provide an option to keep generated code files
- Added compile error handler and warning handler to ICompiler

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #25474 from dongjoon-hyun/SPARK-28758.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-16 11:33:02 -07:00
Liang-Chi Hsieh 0094b5fe72 [SPARK-28722][ML] Change sequential label sorting in StringIndexer fit to parallel
## What changes were proposed in this pull request?

The `fit` method in `StringIndexer` sorts given labels in a sequential approach, if there are multiple input columns. When the number of input column increases, the time of label sorting dramatically increases too so it is hard to use in practice if dealing with hundreds of input columns.

This patch tries to make the label sorting parallel.

This runs benchmark like:
```scala
import org.apache.spark.ml.feature.StringIndexer

val numCol = 300

val data = (0 to 100).map { i =>
  (i, 100 * i)
}
var df = data.toDF("id", "label0")
(1 to numCol).foreach { idx =>
  df = df.withColumn(s"label$idx", col("label0") + 1)
}
val inputCols = (0 to numCol).map(i => s"label$i").toArray
val outputCols = (0 to numCol).map(i => s"labelIndex$i").toArray
val t0 = System.nanoTime()
val indexer = new StringIndexer().setInputCols(inputCols).setOutputCols(outputCols).setStringOrderType("alphabetDesc").fit(df)
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) / 1000000000.0 + "s")
```

| numCol  | 20 | 50  | 100  | 200  | 300 |
|--:|---|---|---|---|---|
|  Before |  9.85 |  28.62 | 64.35  | 167.17  | 431.60 |
| After  | 2.44  | 2.71  | 3.34  | 4.83  | 6.90 |

Unit: second

## How was this patch tested?

Passed existing tests. Manually test for performance.

Closes #25442 from viirya/improve_stringindexer2.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-16 12:39:12 -05:00
HyukjinKwon 7f44a6e367 [SPARK-28755][R][TESTS] Increase tolerance in 'spark.mlp' SparkR test for JDK 11
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This PR proposes to increase the tolerance for the exact value comparison in `spark.mlp` test. I don't know the root cause but some tolerance is already expected. I suspect it is not a big deal considering all other tests pass.

The values are fairly close:

JDK 8:

```
-24.28415, 107.8701, 16.86376, 1.103736, 9.244488
```

JDK 11:

```
-24.33892, 108.0316, 16.89082, 1.090723, 9.260533
```

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

To fully support JDK 11. See, for instance, #25443 and #25423 for ongoing efforts.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->

No

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Manually tested on the top of https://github.com/apache/spark/pull/25472 with JDK 11

```bash
./build/mvn -DskipTests -Psparkr -Phadoop-3.2 package
./bin/sparkR
```

```R
absoluteSparkPath <- function(x) {
  sparkHome <- sparkR.conf("spark.home")
  file.path(sparkHome, x)
}
df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
              source = "libsvm")
model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
                   solver = "l-bfgs", maxIter = 100, tol = 0.00001, stepSize = 1, seed = 1)
summary <- summary(model)
head(summary$weights, 5)
```

Closes #25478 from HyukjinKwon/SPARK-28755.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-16 10:03:14 -07:00
Dongjoon Hyun 2f04152921 [SPARK-28756][R] Fix checkJavaVersion to accept JDK8+
### What changes were proposed in this pull request?
Currently, `checkJavaVersion` only accepts JDK8 because it compares with the number in `SystemRequirements`. This PR changes it to accept the higher version, too.

### Why are the changes needed?
Without this, two test suites are skipped on JDK11 environment due to this check.

**BEFORE**
```
$ build/mvn -Phadoop-3.2 -Psparkr -DskipTests package
$ R/install-dev.sh
$ R/run-tests.sh
...
basic tests for CRAN: SS

Skipped ------------------------------------------------------------------------
1. create DataFrame from list or data.frame (test_basic.R#21) - error on Java check
2. spark.glm and predict (test_basic.R#57) - error on Java check
DONE ===========================================================================
```

**AFTER**
```
basic tests for CRAN: .............

DONE ===========================================================================
```

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?

Manually, build and test on JDK11.

Closes #25472 from dongjoon-hyun/SPARK-28756.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-16 10:01:59 -07:00
angerszhu 036fd3903f [SPARK-27637][SHUFFLE][FOLLOW-UP] For nettyBlockTransferService, if IOException occurred while create client, check whether relative executor is alive before retry #24533
### What changes were proposed in this pull request?

In pr #[24533](https://github.com/apache/spark/pull/24533/files) , it prevent retry to a removed Executor.
In my test, I can't catch exceptions from
` new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
              transportConf, tempFileManager).start()`
And I check the code carefully, method **start()** will handle exception of IOException in it's retry logical, won't throw it out. until it meet maxRetry times or meet exception that is not  IOException.

And if we meet the situation that when we fetch block , the executor is dead, when we rerun
`RetryingBlockFetcher.BlockFetchStarter.createAndStart()`
we may failed when we create a transport client to dead executor. it will throw a IOException.
We should catch this IOException.

### Why are the changes needed?
Old solution not comprehensive. Didn't cover more case.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existed Unit Test

Closes #25469 from AngersZhuuuu/SPARK-27637-FLLOW-UP.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-16 23:24:32 +08:00
Gengliang Wang 92bfd9a317 [SPARK-28757][SQL] File table location should include both values of option path and paths
### What changes were proposed in this pull request?
If both options `path` and `paths` are passed to file data source v2, both values of the options should be included as the target paths.

### Why are the changes needed?
In V1 implementation, file table location includes both values of option `path` and `paths`.
In the refactoring of https://github.com/apache/spark/pull/24025, the value of option `path` is ignored if "paths" are specified. We should make it consistent with V1.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Unit test

Closes #25473 from gengliangwang/fixPathOption.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-16 22:27:27 +08:00
pavithra c48e381214 [SPARK-28671][SQL] Throw NoSuchPermanentFunctionException for a non-exsistent permanent function in dropFunction/alterFunction
## What changes were proposed in this pull request?
**Before Fix**
When a non existent permanent function is dropped, generic NoSuchFunctionException was thrown.- which printed "This function is neither a registered temporary function nor a permanent function registered in the database" .
This creates a ambiguity when a temp function in the same name exist.

**After Fix**
 NoSuchPermanentFunctionException will be thrown, which will print
"NoSuchPermanentFunctionException:Function not found in database "

## How was this patch tested?
Unit test was run and corrected the UT.

Closes #25394 from PavithraRamachandran/funcIssue.

Lead-authored-by: pavithra <pavi.rams@gmail.com>
Co-authored-by: pavithraramachandran <pavi.rams@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-08-16 22:46:04 +09:00
HyukjinKwon ef142371e7 [SPARK-28736][SPARK-28735][PYTHON][ML] Fix PySpark ML tests to pass in JDK 11
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This PR proposes to fix both tests below:

```
======================================================================
FAIL: test_raw_and_probability_prediction (pyspark.ml.tests.test_algorithms.MultilayerPerceptronClassifierTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/dongjoon/APACHE/spark-master/python/pyspark/ml/tests/test_algorithms.py", line 89, in test_raw_and_probability_prediction
    self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, atol=1E-4))
AssertionError: False is not true
```

```
File "/Users/dongjoon/APACHE/spark-master/python/pyspark/mllib/clustering.py", line 386, in __main__.GaussianMixtureModel
Failed example:
    abs(softPredicted[0] - 1.0) < 0.001
Expected:
    True
Got:
    False
**********************************************************************
File "/Users/dongjoon/APACHE/spark-master/python/pyspark/mllib/clustering.py", line 388, in __main__.GaussianMixtureModel
Failed example:
    abs(softPredicted[1] - 0.0) < 0.001
Expected:
    True
Got:
    False
```

to pass in JDK 11.

The root cause seems to be different float values being understood via Py4J. This issue also was found in https://github.com/apache/spark/pull/25132 before.

When floats are transferred from Python to JVM, the values are sent as are. Python floats are not "precise" due to its own limitation - https://docs.python.org/3/tutorial/floatingpoint.html.
For some reasons, the floats from Python on JDK 8 and JDK 11 are different, which is already explicitly not guaranteed.

This seems why only some tests in PySpark with floats are being failed.

So, this PR fixes it by increasing tolerance in identified test cases in PySpark.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

To fully support JDK 11. See, for instance, https://github.com/apache/spark/pull/25443 and https://github.com/apache/spark/pull/25423 for ongoing efforts.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->

No.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Manually tested as described in JIRAs:

```
$ build/sbt -Phadoop-3.2 test:package
$ python/run-tests --testnames 'pyspark.ml.tests.test_algorithms' --python-executables python
```

```
$ build/sbt -Phadoop-3.2 test:package
$ python/run-tests --testnames 'pyspark.mllib.clustering' --python-executables python
```

Closes #25475 from HyukjinKwon/SPARK-28735.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-16 19:47:29 +09:00
Fokko Driesprong babdba0f9e [SPARK-28728][BUILD] Bump Jackson Databind to 2.9.9.3
## What changes were proposed in this pull request?

Update Jackson databind to the latest version for some latest changes.

## How was this patch tested?

Pass the Jenkins.

Closes #25451 from Fokko/fd-bump-jackson-databind.

Lead-authored-by: Fokko Driesprong <fokko@apache.org>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-16 03:40:41 -07:00
HyukjinKwon 0ea8db9fd3 [SPARK-28578][INFRA] Improve Github pull request template
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This PR proposes to improve the Github template for better and faster review iterations and better interactions between PR authors and reviewers.

As suggested in the the [dev mailing list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-New-sections-in-Github-Pull-Request-description-template-td27527.html), this PR referred [Kubernates' PR template](https://raw.githubusercontent.com/kubernetes/kubernetes/master/.github/PULL_REQUEST_TEMPLATE.md).

Therefore, those fields are newly added:

```
### Why are the changes needed?
### Does this PR introduce any user-facing change?
```

and some comments were added.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Currently, many PR descriptions are poorly formatted, which causes some overheads between PR authors and reviewers.

There are multiple problems by those poorly formatted PR descriptions:

- Some PRs still write single line in PR description with 500+- code changes in a critical path.
- Some PRs do not describe behaviour changes and reviewers need to find and document.
- Some PRs are hard to review without outlines but they are not mentioned sometimes.
- Spark is being old and sometimes we need to track the history deep. Due to poorly formatted PR description,  sometimes it requires to read whole codes of whole commit histories to find the root cause of a bug.
- Reviews take a while but the number of PR still grows.

This PR targets to alleviate the problems and situation.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->

Yes, it changes the PR templates when PRs are open. This PR uses the template this PR proposes.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Manually tested via Github preview feature.

Closes #25310 from HyukjinKwon/SPARK-28578.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-16 09:45:15 +09:00
DB Tsai c2b40a76bb [SPARK-28719][BUILD][FOLLOWUP] Make Github Actions log quieter
## What changes were proposed in this pull request?

Make Github Actions log quieter

Closes #25468 from dbtsai/actions2.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-08-15 22:22:44 +00:00
Maxim Gekk 96ca734fb7 [SPARK-28745][SQL][TEST] Add benchmarks for extract()
## What changes were proposed in this pull request?

Added new benchmark `ExtractBenchmark` for the `EXTRACT(field FROM source)` function. It was executed on all currently supported values of the `field` argument:  `MILLENNIUM`, `CENTURY`, `DECADE`, `YEAR`, `ISOYEAR`, `QUARTER`, `MONTH`, `WEEK`, `DAY`, `DAYOFWEEK`, `HOUR`, `MINUTE`, `SECOND`, `MILLISECONDS`, `MICROSECONDS`, `EPOCH`. The `cast(id as timestamp)` was taken as the `source` argument.

## How was this patch tested?

By running the benchmark via:
```
$ SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.ExtractBenchmark"
```

Closes #25462 from MaxGekk/extract-benchmark.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-15 12:44:36 -07:00
Dongjoon Hyun 123eb58d61 [MINOR][DOC] Use Java 8 instead of Java 8+ as a running environment
## What changes were proposed in this pull request?

After Apache Spark 3.0.0 supports JDK11 officially, people will try JDK11 on old Spark releases (especially 2.4.4/2.3.4) in the same way because our document says `Java 8+`. We had better avoid that misleading situation.

This PR aims to remove `+` from `Java 8+` in the documentation (master/2.4/2.3). Especially, 2.4.4 release and 2.3.4 release (cc kiszk )

On master branch, we will add JDK11 after [SPARK-24417.](https://issues.apache.org/jira/browse/SPARK-24417)

## How was this patch tested?

This is a documentation only change.

<img width="923" alt="java8" src="https://user-images.githubusercontent.com/9700541/63116589-e1504800-bf4e-11e9-8904-b160ec7a42c0.png">

Closes #25466 from dongjoon-hyun/SPARK-DOC-JDK8.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-15 11:22:57 -07:00
Steve Loughran 2ac6163a5d [SPARK-23977][SQL] Support High Performance S3A committers [test-hadoop3.2]
This patch adds the binding classes to enable spark to switch dataframe output to using the S3A zero-rename committers shipping in Hadoop 3.1+. It adds a source tree into the hadoop-cloud-storage module which only compiles with the hadoop-3.2 profile, and contains a binding for normal output and a specific bridge class for Parquet (as the parquet output format requires a subclass of `ParquetOutputCommitter`.

Commit algorithms are a critical topic. There's no formal proof of correctness, but the algorithms are documented an analysed in [A Zero Rename Committer](https://github.com/steveloughran/zero-rename-committer/releases). This also reviews the classic v1 and v2 algorithms, IBM's swift committer and the one from EMRFS which they admit was based on the concepts implemented here.

Test-wise

* There's a public set of scala test suites [on github](https://github.com/hortonworks-spark/cloud-integration)
* We have run integration tests against Spark on Yarn clusters.
* This code has been shipping for ~12 months in HDP-3.x.

Closes #24970 from steveloughran/cloud/SPARK-23977-s3a-committer.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-15 09:39:26 -07:00
Huaxin Gao ba5ee27706 [SPARK-28243][PYSPARK][ML][FOLLOW-UP] Move Python DecisionTreeParams to regression.py
## What changes were proposed in this pull request?
Leave ```shared.py``` untouched. Move Python ```DecisionTreeParams``` to ```regression.py```

## How was this patch tested?
Use existing tests

Closes #25406 from huaxingao/spark-28243.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-15 10:21:26 -05:00
Unknown 3f35440304 [SPARK-28543][DOCS][WEBUI] Document Spark Jobs page
## What changes were proposed in this pull request?

New documentation to explain in detail Web UI Jobs page and link it to monitoring page. New images are included to better explanation

![image](https://user-images.githubusercontent.com/12819544/62898145-2741bc00-bd55-11e9-89f7-175a4fd81009.png)
![image](https://user-images.githubusercontent.com/12819544/62898187-39235f00-bd55-11e9-9f03-a4d179e197fe.png)

## How was this patch tested?

This pull request contains only documentation. I have generated it using "jekyll build" to ensure that it's ok

Closes #25424 from planga82/feature/SPARK-28543_ImproveWebUIDocs.

Lead-authored-by: Unknown <soypab@gmail.com>
Co-authored-by: Pablo <soypab@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-15 08:52:23 -05:00
Yuming Wang 1b416a0c77 [SPARK-27592][SQL] Set the bucketed data source table SerDe correctly
## What changes were proposed in this pull request?

Hive using incorrect **InputFormat**(`org.apache.hadoop.mapred.SequenceFileInputFormat`) to read Spark's **Parquet** bucketed data source table.
Spark side:
```sql
spark-sql> CREATE TABLE t (c1 INT, c2 INT) USING parquet CLUSTERED BY (c1) SORTED BY (c1) INTO 2 BUCKETS;
2019-04-29 17:52:05 WARN  HiveExternalCatalog:66 - Persisting bucketed data source table `default`.`t` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
spark-sql> DESC FORMATTED t;
c1	int	NULL
c2	int	NULL

# Detailed Table Information
Database	default
Table	t
Owner	yumwang
Created Time	Mon Apr 29 17:52:05 CST 2019
Last Access	Thu Jan 01 08:00:00 CST 1970
Created By	Spark 2.4.0
Type	MANAGED
Provider	parquet
Num Buckets	2
Bucket Columns	[`c1`]
Sort Columns	[`c1`]
Table Properties	[transient_lastDdlTime=1556531525]
Location	file:/user/hive/warehouse/t
Serde Library	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat	org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties	[serialization.format=1]
```
Hive side:
```sql
hive> DESC FORMATTED t;
OK
# col_name            	data_type           	comment

c1                  	int
c2                  	int

# Detailed Table Information
Database:           	default
Owner:              	root
CreateTime:         	Wed May 08 03:38:46 GMT-07:00 2019
LastAccessTime:     	UNKNOWN
Retention:          	0
Location:           	file:/user/hive/warehouse/t
Table Type:         	MANAGED_TABLE
Table Parameters:
	bucketing_version   	spark
	spark.sql.create.version	3.0.0-SNAPSHOT
	spark.sql.sources.provider	parquet
	spark.sql.sources.schema.bucketCol.0	c1
	spark.sql.sources.schema.numBucketCols	1
	spark.sql.sources.schema.numBuckets	2
	spark.sql.sources.schema.numParts	1
	spark.sql.sources.schema.numSortCols	1
	spark.sql.sources.schema.part.0	{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}
	spark.sql.sources.schema.sortCol.0	c1
	transient_lastDdlTime	1557311926

# Storage Information
SerDe Library:      	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat:        	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat:       	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed:         	No
Num Buckets:        	-1
Bucket Columns:     	[]
Sort Columns:       	[]
Storage Desc Params:
	path                	file:/user/hive/warehouse/t
	serialization.format	1
```

So it's non-bucketed table at Hive side. This pr set the `SerDe` correctly so Hive can read these tables.

Related code:
33f3c48cac/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala (L976-L990)
f9776e3892/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala (L444-L459)

## How was this patch tested?

unit tests

Closes #24486 from wangyum/SPARK-27592.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-15 17:21:13 +08:00
Gabor Somogyi a493031e2e [SPARK-28695][SS] Use CaseInsensitiveMap in KafkaSourceProvider to make source param handling more robust
## What changes were proposed in this pull request?

[SPARK-28163](https://issues.apache.org/jira/browse/SPARK-28163) fixed a bug and during the analysis we've concluded it would be more robust to use `CaseInsensitiveMap` inside Kafka source. This case less lower/upper case problem would rise in the future.

Please note this PR doesn't intend to solve any kind of actual problem but finish the concept added in [SPARK-28163](https://issues.apache.org/jira/browse/SPARK-28163) (in a fix PR I didn't want to add too invasive changes). In this PR I've changed `Map[String, String]` to `CaseInsensitiveMap[String]` to enforce the usage. These are the main use-cases:
* `contains` => `CaseInsensitiveMap` solves it
* `get...` => `CaseInsensitiveMap` solves it
* `filter` => keys must be converted to lowercase because there is no guarantee that the incoming map has such key set
* `find` => keys must be converted to lowercase because there is no guarantee that the incoming map has such key set
* passing parameters to Kafka consumer/producer => keys must be converted to lowercase because there is no guarantee that the incoming map has such key set

## How was this patch tested?

Existing unit tests.

Closes #25418 from gaborgsomogyi/SPARK-28695.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-15 14:43:52 +08:00
Burak Yavuz 0526529b31 [SPARK-28666] Support saveAsTable for V2 tables through Session Catalog
## What changes were proposed in this pull request?

We add support for the V2SessionCatalog for saveAsTable, such that V2 tables can plug in and leverage existing DataFrameWriter.saveAsTable APIs to write and create tables through the session catalog.

## How was this patch tested?

Unit tests. A lot of tests broke under hive when things were not working properly under `ResolveTables`, therefore I believe the current set of tests should be sufficient in testing the table resolution and read code paths.

Closes #25402 from brkyvz/saveAsV2.

Lead-authored-by: Burak Yavuz <brkyvz@gmail.com>
Co-authored-by: Burak Yavuz <burak@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-15 12:29:34 +08:00
Xianjin YE 3ec24fd128 [SPARK-28203][CORE][PYTHON] PythonRDD should respect SparkContext's hadoop configuration
## What changes were proposed in this pull request?
1. PythonHadoopUtil.mapToConf generates a Configuration with loadDefaults disabled
2. merging hadoop conf in several places of PythonRDD is consistent.

## How was this patch tested?
Added a new test and existed tests

Closes #25002 from advancedxy/SPARK-28203.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-15 10:39:33 +09:00
Maxim Gekk 3a4afce96c [SPARK-28687][SQL] Support epoch, isoyear, milliseconds and microseconds at extract()
## What changes were proposed in this pull request?

In the PR, I propose new expressions `Epoch`, `IsoYear`, `Milliseconds` and `Microseconds`, and support additional parameters of `extract()` for feature parity with PostgreSQL (https://www.postgresql.org/docs/11/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT):

1. `epoch` - the number of seconds since 1970-01-01 00:00:00 local time in microsecond precision.
2. `isoyear` - the ISO 8601 week-numbering year that the date falls in. Each ISO 8601 week-numbering year begins with the Monday of the week containing the 4th of January.
3. `milliseconds` - the seconds field including fractional parts multiplied by 1,000.
4. `microseconds` - the seconds field including fractional parts multiplied by 1,000,000.

Here are examples:
```sql
spark-sql> SELECT EXTRACT(EPOCH FROM TIMESTAMP '2019-08-11 19:07:30.123456');
1565550450.123456
spark-sql> SELECT EXTRACT(ISOYEAR FROM DATE '2006-01-01');
2005
spark-sql> SELECT EXTRACT(MILLISECONDS FROM TIMESTAMP '2019-08-11 19:07:30.123456');
30123.456
spark-sql> SELECT EXTRACT(MICROSECONDS FROM TIMESTAMP '2019-08-11 19:07:30.123456');
30123456
```

## How was this patch tested?

Added new tests to `DateExpressionsSuite`, and uncommented existing tests in `extract.sql` and `pgSQL/date.sql`.

Closes #25408 from MaxGekk/extract-ext3.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-14 08:44:44 -07:00
xy_xin 2eeb25e52d [SPARK-28351][SQL] Support DELETE in DataSource V2
## What changes were proposed in this pull request?

This pr adds DELETE support for V2 datasources. As a first step, this pr only support delete by source filters:
```scala
void delete(Filter[] filters);
```
which could not deal with complicated cases like subqueries.

Since it's uncomfortable to embed the implementation of DELETE in the current V2 APIs, a new mix-in of datasource is added, which is called `SupportsMaintenance`, similar to `SupportsRead` and `SupportsWrite`.  A datasource which can be maintained means we can perform DELETE/UPDATE/MERGE/OPTIMIZE on the datasource, as long as the datasource implements the necessary mix-ins.

## How was this patch tested?

new test case.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25115 from xianyinxin/SPARK-28351.

Authored-by: xy_xin <xianyin.xxy@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-14 23:38:45 +08:00
John Zhuge 391c7e8f2e [SPARK-27739][SQL] df.persist should save stats from optimized plan
## What changes were proposed in this pull request?

CacheManager.cacheQuery saves the stats from the optimized plan to cache.

## How was this patch tested?

Existing testss.

Closes #24623 from jzhuge/SPARK-27739.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-14 19:49:53 +08:00