Commit graph

1528 commits

Author SHA1 Message Date
Sandeep Singh da02d006bb [SPARK-15249][SQL] Use FunctionResource instead of (String, String) in CreateFunction and CatalogFunction for resource
Use FunctionResource instead of (String, String) in CreateFunction and CatalogFunction for resource
see: TODO's here
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L36
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala#L42

Existing tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #13024 from techaddict/SPARK-15249.
2016-05-10 14:22:03 -07:00
Herman van Hovell d28c67544b [SPARK-14986][SQL] Return correct result for empty LATERAL VIEW OUTER
## What changes were proposed in this pull request?
A Generate with the `outer` flag enabled should always return one or more rows for every input row. The optimizer currently violates this by rewriting `outer` Generates that do not contain columns of the child plan into an unjoined generate, for example:
```sql
select e from a lateral view outer explode(a.b) as e
```
The result of this is that `outer` Generate does not produce output at all when the Generators' input expression is empty. This PR fixes this.

## How was this patch tested?
Added test case to `SQLQuerySuite`.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12906 from hvanhovell/SPARK-14986.
2016-05-10 12:47:31 -07:00
gatorsmile 5c6b085578 [SPARK-14603][SQL] Verification of Metadata Operations by Session Catalog
Since we cannot really trust if the underlying external catalog can throw exceptions when there is an invalid metadata operation, let's do it in SessionCatalog.

- [X] The first step is to unify the error messages issued in Hive-specific Session Catalog and general Session Catalog.
- [X] The second step is to verify the inputs of metadata operations for partitioning-related operations. This is moved to a separate PR: https://github.com/apache/spark/pull/12801
- [X] The third step is to add database existence verification in `SessionCatalog`
- [X] The fourth step is to add table existence verification in `SessionCatalog`
- [X] The fifth step is to add function existence verification in `SessionCatalog`

Add test cases and verify the error messages we issued

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12385 from gatorsmile/verifySessionAPIs.
2016-05-10 11:25:55 -07:00
Herman van Hovell 2646265368 [SPARK-14773] [SPARK-15179] [SQL] Fix SQL building and enable Hive tests
## What changes were proposed in this pull request?
This PR fixes SQL building for predicate subqueries and correlated scalar subqueries. It also enables most Hive subquery tests.

## How was this patch tested?
Enabled new tests in HiveComparisionSuite.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12988 from hvanhovell/SPARK-14773.
2016-05-10 09:56:07 -07:00
gatorsmile 5706472670 [SPARK-15215][SQL] Fix Explain Parsing and Output
#### What changes were proposed in this pull request?
This PR is to address a few existing issues in `EXPLAIN`:
- The `EXPLAIN` options `LOGICAL | FORMATTED | EXTENDED | CODEGEN` should not be 0 or more match. It should 0 or one match. Parser does not allow users to use more than one option in a single command.
- The option `LOGICAL` is not supported. Issue an exception when users specify this option in the command.
- The output of `EXPLAIN ` contains a weird empty line when the output of analyzed plan is empty. We should remove it. For example:
  ```
  == Parsed Logical Plan ==
  CreateTable CatalogTable(`t`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.  HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(col,int,true,None)),List(),List(),List(),-1,,1462725171656,-1,Map(),None,None,None), false

  == Analyzed Logical Plan ==

  CreateTable CatalogTable(`t`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.  HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(col,int,true,None)),List(),List(),List(),-1,,1462725171656,-1,Map(),None,None,None), false

  == Optimized Logical Plan ==
  CreateTable CatalogTable(`t`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.  HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(col,int,true,None)),List(),List(),List(),-1,,1462725171656,-1,Map(),None,None,None), false
  ...
  ```

#### How was this patch tested?
Added and modified a few test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12991 from gatorsmile/explainCreateTable.
2016-05-10 11:53:37 +02:00
gatorsmile f45379173b [SPARK-15187][SQL] Disallow Dropping Default Database
#### What changes were proposed in this pull request?
In Hive Metastore, dropping default database is not allowed. However, in `InMemoryCatalog`, this is allowed.

This PR is to disallow users to drop default database.

#### How was this patch tested?
Previously, we already have a test case in HiveDDLSuite. Now, we also add the same one in DDLSuite

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12962 from gatorsmile/dropDefaultDB.
2016-05-10 11:57:01 +08:00
Andrew Or 8f932fb88d [SPARK-15234][SQL] Fix spark.catalog.listDatabases.show()
## What changes were proposed in this pull request?

Before:
```
scala> spark.catalog.listDatabases.show()
+--------------------+-----------+-----------+
|                name|description|locationUri|
+--------------------+-----------+-----------+
|Database[name='de...|
|Database[name='my...|
|Database[name='so...|
+--------------------+-----------+-----------+
```

After:
```
+-------+--------------------+--------------------+
|   name|         description|         locationUri|
+-------+--------------------+--------------------+
|default|Default Hive data...|file:/user/hive/w...|
|  my_db|  This is a database|file:/Users/andre...|
|some_db|                    |file:/private/var...|
+-------+--------------------+--------------------+
```

## How was this patch tested?

New test in `CatalogSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #13015 from andrewor14/catalog-show.
2016-05-09 20:02:23 -07:00
Josh Rosen c3350cadb8 [SPARK-14972] Improve performance of JSON schema inference's compatibleType method
This patch improves the performance of `InferSchema.compatibleType` and `inferField`. The net result of this patch is a 6x speedup in local benchmarks running against cached data with a massive nested schema.

The key idea is to remove unnecessary sorting in `compatibleType`'s `StructType` merging code. This code takes two structs, merges the fields with matching names, and copies over the unique fields, producing a new schema which is the union of the two structs' schemas. Previously, this code performed a very inefficient `groupBy()` to match up fields with the same name, but this is unnecessary because `inferField` already sorts structs' fields by name: since both lists of fields are sorted, we can simply merge them in a single pass.

This patch also speeds up the existing field sorting in `inferField`: the old sorting code allocated unnecessary intermediate collections, while the new code uses mutable collects and performs in-place sorting.

I rewrote inefficient `equals()` implementations in `StructType` and `Metadata`, significantly reducing object allocations in those methods.

Finally, I replaced a `treeAggregate` call with `fold`: I doubt that `treeAggregate` will benefit us very much because the schemas would have to be enormous to realize large savings in network traffic. Since most schemas are probably fairly small in serialized form, they should typically fit within a direct task result and therefore can be incrementally merged at the driver as individual tasks finish. This change eliminates an entire (short) scheduler stage.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #12750 from JoshRosen/schema-inference-speedups.
2016-05-09 13:11:18 -07:00
Zheng RuiFeng dfdcab00c7 [SPARK-15210][SQL] Add missing @DeveloperApi annotation in sql.types
add DeveloperApi annotation for `AbstractDataType` `MapType` `UserDefinedType`

local build

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #12982 from zhengruifeng/types_devapi.
2016-05-09 11:21:16 -07:00
Liang-Chi Hsieh e083db2e9e [SPARK-15225][SQL] Replace SQLContext with SparkSession in Encoder documentation
`Encoder`'s doc mentions `sqlContext.implicits._`. We should use `sparkSession.implicits._` instead now.

Only doc update.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #13002 from viirya/encoder-doc.
2016-05-09 11:06:08 -07:00
Wenchen Fan beb16ec556 [SPARK-15093][SQL] create/delete/rename directory for InMemoryCatalog operations if needed
## What changes were proposed in this pull request?

following operations have file system operation now:

1. CREATE DATABASE: create a dir
2. DROP DATABASE: delete the dir
3. CREATE TABLE: create a dir
4. DROP TABLE: delete the dir
5. RENAME TABLE: rename the dir
6. CREATE PARTITIONS: create a dir
7. RENAME PARTITIONS: rename the dir
8. DROP PARTITIONS: drop the dir

## How was this patch tested?

new tests in `ExternalCatalogSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12871 from cloud-fan/catalog.
2016-05-09 10:47:45 -07:00
Ryan Blue 652bbb1bf6 [SPARK-14459][SQL] Detect relation partitioning and adjust the logical plan
## What changes were proposed in this pull request?

This detects a relation's partitioning and adds checks to the analyzer.
If an InsertIntoTable node has no partitioning, it is replaced by the
relation's partition scheme and input columns are correctly adjusted,
placing the partition columns at the end in partition order. If an
InsertIntoTable node has partitioning, it is checked against the table's
reported partitions.

These changes required adding a PartitionedRelation trait to the catalog
interface because Hive's MetastoreRelation doesn't extend
CatalogRelation.

This commit also includes a fix to InsertIntoTable's resolved logic,
which now detects that all expected columns are present, including
dynamic partition columns. Previously, the number of expected columns
was not checked and resolved was true if there were missing columns.

## How was this patch tested?

This adds new tests to the InsertIntoTableSuite that are fixed by this PR.

Author: Ryan Blue <blue@apache.org>

Closes #12239 from rdblue/SPARK-14459-detect-hive-partitioning.
2016-05-09 17:01:23 +08:00
gatorsmile a59ab594ca [SPARK-15184][SQL] Fix Silent Removal of An Existent Temp Table by Rename Table
#### What changes were proposed in this pull request?
Currently, if we rename a temp table `Tab1` to another existent temp table `Tab2`. `Tab2` will be silently removed. This PR is to detect it and issue an exception message.

In addition, this PR also detects another issue in the rename table command. When the destination table identifier does have database name, we should not ignore them. That might mean users could rename a regular table.

#### How was this patch tested?
Added two related test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12959 from gatorsmile/rewriteTable.
2016-05-09 13:05:18 +08:00
gatorsmile e9131ec277 [SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent Table/Function/Partitions by Rename
#### What changes were proposed in this pull request?
So far, in the implementation of InMemoryCatalog, we do not check if the new/destination table/function/partition exists or not. Thus, we just silently remove the existent table/function/partition.

This PR is to detect them and issue an appropriate exception.

#### How was this patch tested?
Added the related test cases. They also verify if HiveExternalCatalog also detects these errors.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12960 from gatorsmile/renameInMemoryCatalog.
2016-05-09 12:40:30 +08:00
Herman van Hovell df89f1d43d [SPARK-15122] [SQL] Fix TPC-DS 41 - Normalize predicates before pulling them out
## What changes were proposed in this pull request?
The official TPC-DS 41 query currently fails because it contains a scalar subquery with a disjunctive correlated predicate (the correlated predicates were nested in ORs). This makes the `Analyzer` pull out the entire predicate which is wrong and causes the following (correct) analysis exception: `The correlated scalar subquery can only contain equality predicates`

This PR fixes this by first simplifing (or normalizing) the correlated predicates before pulling them out of the subquery.

## How was this patch tested?
Manual testing on TPC-DS 41, and added a test to SubquerySuite.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12954 from hvanhovell/SPARK-15122.
2016-05-06 21:06:03 -07:00
gatorsmile 5c8fad7b9b [SPARK-15108][SQL] Describe Permanent UDTF
#### What changes were proposed in this pull request?
When Describe a UDTF, the command returns a wrong result. The command is unable to find the function, which has been created and cataloged in the catalog but not in the functionRegistry.

This PR is to correct it. If the function is not in the functionRegistry, we will check the catalog for collecting the information of the UDTF function.

#### How was this patch tested?
Added test cases to verify the results

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12885 from gatorsmile/showFunction.
2016-05-06 11:43:07 -07:00
Jacek Laskowski bbb7773437 [SPARK-15152][DOC][MINOR] Scaladoc and Code style Improvements
## What changes were proposed in this pull request?

Minor doc and code style fixes

## How was this patch tested?

local build

Author: Jacek Laskowski <jacek@japila.pl>

Closes #12928 from jaceklaskowski/SPARK-15152.
2016-05-05 16:34:27 -07:00
Shixiong Zhu bb9991dec5 [SPARK-15135][SQL] Make sure SparkSession thread safe
## What changes were proposed in this pull request?

Went through SparkSession and its members and fixed non-thread-safe classes used by SparkSession

## How was this patch tested?

Existing unit tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12915 from zsxwing/spark-session-thread-safe.
2016-05-05 14:36:47 -07:00
gatorsmile 8cba57a75c [SPARK-14124][SQL][FOLLOWUP] Implement Database-related DDL Commands
#### What changes were proposed in this pull request?

First, a few test cases failed in mac OS X  because the property value of `java.io.tmpdir` does not include a trailing slash on some platform. Hive always removes the last trailing slash. For example, what I got in the web:
```
Win NT  --> C:\TEMP\
Win XP  --> C:\TEMP
Solaris --> /var/tmp/
Linux   --> /var/tmp
```
Second, a couple of test cases are added to verify if the commands work properly.

#### How was this patch tested?
Added a test case for it and correct the previous test cases.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12081 from gatorsmile/mkdir.
2016-05-05 14:34:24 -07:00
Wenchen Fan 55cc1c991a [SPARK-14139][SQL] RowEncoder should preserve schema nullability
## What changes were proposed in this pull request?

The problem is: In `RowEncoder`, we use `Invoke` to get the field of an external row, which lose the nullability information. This PR creates a `GetExternalRowField` expression, so that we can preserve the nullability info.

TODO: simplify the null handling logic in `RowEncoder`, to remove so many if branches, in follow-up PR.

## How was this patch tested?

new tests in `RowEncoderSuite`

Note that, This PR takes over https://github.com/apache/spark/pull/11980, with a little simplification, so all credits should go to koertkuipers

Author: Wenchen Fan <wenchen@databricks.com>
Author: Koert Kuipers <koert@tresata.com>

Closes #12364 from cloud-fan/nullable.
2016-05-06 01:08:04 +08:00
Kousuke Saruta 1a9b341581 [SPARK-15132][MINOR][SQL] Debug log for generated code should be printed with proper indentation
## What changes were proposed in this pull request?

Similar to #11990, GenerateOrdering and GenerateColumnAccessor should print debug log for generated code with proper indentation.

## How was this patch tested?

Manually checked.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #12908 from sarutak/SPARK-15132.
2016-05-04 22:18:55 -07:00
Sean Zhong 8fb1463d6a [SPARK-6339][SQL] Supports CREATE TEMPORARY VIEW tableIdentifier AS query
## What changes were proposed in this pull request?

This PR support new SQL syntax CREATE TEMPORARY VIEW.
Like:
```
CREATE TEMPORARY VIEW viewName AS SELECT * from xx
CREATE OR REPLACE TEMPORARY VIEW viewName AS SELECT * from xx
CREATE TEMPORARY VIEW viewName (c1 COMMENT 'blabla', c2 COMMENT 'blabla') AS SELECT * FROM xx
```

## How was this patch tested?

Unit tests.

Author: Sean Zhong <clockfly@gmail.com>

Closes #12872 from clockfly/spark-6399.
2016-05-04 18:27:25 -07:00
Liang-Chi Hsieh b85d21fb9d [SPARK-14951] [SQL] Support subexpression elimination in TungstenAggregate
## What changes were proposed in this pull request?

We can support subexpression elimination in TungstenAggregate by using current `EquivalentExpressions` which is already used in subexpression elimination for expression codegen.

However, in wholestage codegen, we can't wrap the common expression's codes in functions as before, we simply generate the code snippets for common expressions. These code snippets are inserted before the common expressions are actually used in generated java codes.

For multiple `TypedAggregateExpression` used in aggregation operator, since their input type should be the same. So their `inputDeserializer` will be the same too. This patch can also reduce redundant input deserialization.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #12729 from viirya/subexpr-elimination-tungstenaggregate.
2016-05-04 10:54:51 -07:00
Cheng Lian f152fae306 [SPARK-14127][SQL] Native "DESC [EXTENDED | FORMATTED] <table>" DDL command
## What changes were proposed in this pull request?

This PR implements native `DESC [EXTENDED | FORMATTED] <table>` DDL command. Sample output:

```
scala> spark.sql("desc extended src").show(100, truncate = false)
+----------------------------+---------------------------------+-------+
|col_name                    |data_type                        |comment|
+----------------------------+---------------------------------+-------+
|key                         |int                              |       |
|value                       |string                           |       |
|                            |                                 |       |
|# Detailed Table Information|CatalogTable(`default`.`src`, ...|       |
+----------------------------+---------------------------------+-------+

scala> spark.sql("desc formatted src").show(100, truncate = false)
+----------------------------+----------------------------------------------------------+-------+
|col_name                    |data_type                                                 |comment|
+----------------------------+----------------------------------------------------------+-------+
|key                         |int                                                       |       |
|value                       |string                                                    |       |
|                            |                                                          |       |
|# Detailed Table Information|                                                          |       |
|Database:                   |default                                                   |       |
|Owner:                      |lian                                                      |       |
|Create Time:                |Mon Jan 04 17:06:00 CST 2016                              |       |
|Last Access Time:           |Thu Jan 01 08:00:00 CST 1970                              |       |
|Location:                   |hdfs://localhost:9000/user/hive/warehouse_hive121/src     |       |
|Table Type:                 |MANAGED                                                   |       |
|Table Parameters:           |                                                          |       |
|  transient_lastDdlTime     |1451898360                                                |       |
|                            |                                                          |       |
|# Storage Information       |                                                          |       |
|SerDe Library:              |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe        |       |
|InputFormat:                |org.apache.hadoop.mapred.TextInputFormat                  |       |
|OutputFormat:               |org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat|       |
|Num Buckets:                |-1                                                        |       |
|Bucket Columns:             |[]                                                        |       |
|Sort Columns:               |[]                                                        |       |
|Storage Desc Parameters:    |                                                          |       |
|  serialization.format      |1                                                         |       |
+----------------------------+----------------------------------------------------------+-------+
```

## How was this patch tested?

A test case is added to `HiveDDLSuite` to check command output.

Author: Cheng Lian <lian@databricks.com>

Closes #12844 from liancheng/spark-14127-desc-table.
2016-05-04 16:44:09 +08:00
Wenchen Fan 6c12e801e8 [SPARK-15029] improve error message for Generate
## What changes were proposed in this pull request?

This PR improve the error message for `Generate` in 3 cases:

1. generator is nested in expressions, e.g. `SELECT explode(list) + 1 FROM tbl`
2. generator appears more than one time in SELECT, e.g. `SELECT explode(list), explode(list) FROM tbl`
3. generator appears in other operator which is not project, e.g. `SELECT * FROM tbl SORT BY explode(list)`

## How was this patch tested?

new tests in `AnalysisErrorSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12810 from cloud-fan/bug.
2016-05-04 00:10:20 -07:00
Andrew Or 6ba17cd147 [SPARK-14414][SQL] Make DDL exceptions more consistent
## What changes were proposed in this pull request?

Just a bunch of small tweaks on DDL exception messages.

## How was this patch tested?

`DDLCommandSuite` et al.

Author: Andrew Or <andrew@databricks.com>

Closes #12853 from andrewor14/make-exceptions-consistent.
2016-05-03 18:07:53 -07:00
gatorsmile 71296c041e [SPARK-15056][SQL] Parse Unsupported Sampling Syntax and Issue Better Exceptions
#### What changes were proposed in this pull request?
Compared with the current Spark parser, there are two extra syntax are supported in Hive for sampling
- In `On` clauses, `rand()` is used for indicating sampling on the entire row instead of an individual column. For example,

   ```SQL
   SELECT * FROM source TABLESAMPLE(BUCKET 3 OUT OF 32 ON rand()) s;
   ```
- Users can specify the total length to be read. For example,

   ```SQL
   SELECT * FROM source TABLESAMPLE(100M) s;
   ```

Below is the link for references:
   https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Sampling

This PR is to parse and capture these two extra syntax, and issue a better error message.

#### How was this patch tested?
Added test cases to verify the thrown exceptions

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12838 from gatorsmile/bucketOnRand.
2016-05-03 23:20:18 +02:00
Andrew Ray d8f528ceb6 [SPARK-13749][SQL][FOLLOW-UP] Faster pivot implementation for many distinct values with two phase aggregation
## What changes were proposed in this pull request?

This is a follow up PR for #11583. It makes 3 lazy vals into just vals and adds unit test coverage.

## How was this patch tested?

Existing unit tests and additional unit tests.

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #12861 from aray/fast-pivot-follow-up.
2016-05-02 22:47:32 -07:00
bomeng 0fd95be3cd [SPARK-15062][SQL] fix list type infer serializer issue
## What changes were proposed in this pull request?

Make serializer correctly inferred if the input type is `List[_]`, since `List[_]` is type of `Seq[_]`, before it was matched to different case (`case t if definedByConstructorParams(t)`).

## How was this patch tested?

New test case was added.

Author: bomeng <bmeng@us.ibm.com>

Closes #12849 from bomeng/SPARK-15062.
2016-05-02 18:20:29 -07:00
Herman van Hovell 1c19c2769e [SPARK-15047][SQL] Cleanup SQL Parser
## What changes were proposed in this pull request?
This PR addresses a few minor issues in SQL parser:

- Removes some unused rules and keywords in the grammar.
- Removes code path for fallback SQL parsing (was needed for Hive native parsing).
- Use `UnresolvedGenerator` instead of hard-coding `Explode` & `JsonTuple`.
- Adds a more generic way of creating error messages for unsupported Hive features.
- Use `visitFunctionName` as much as possible.
- Interpret a `CatalogColumn`'s `DataType` directly instead of parsing it again.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12826 from hvanhovell/SPARK-15047.
2016-05-02 18:12:31 -07:00
Herman van Hovell f362363d14 [SPARK-14785] [SQL] Support correlated scalar subqueries
## What changes were proposed in this pull request?
In this PR we add support for correlated scalar subqueries. An example of such a query is:
```SQL
select * from tbl1 a where a.value > (select max(value) from tbl2 b where b.key = a.key)
```
The implementation adds the `RewriteCorrelatedScalarSubquery` rule to the Optimizer. This rule plans these subqueries using `LEFT OUTER` joins. It currently supports rewrites for `Project`, `Aggregate` & `Filter` logical plans.

I could not find a well defined semantics for the use of scalar subqueries in an `Aggregate`. The current implementation currently evaluates the scalar subquery *before* aggregation. This means that you either have to make scalar subquery part of the grouping expression, or that you have to aggregate it further on. I am open to suggestions on this.

The implementation currently forces the uniqueness of a scalar subquery by enforcing that it is aggregated and that the resulting column is wrapped in an `AggregateExpression`.

## How was this patch tested?
Added tests to `SubquerySuite`.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12822 from hvanhovell/SPARK-14785.
2016-05-02 16:32:31 -07:00
Davies Liu 95e372141a [SPARK-14781] [SQL] support nested predicate subquery
## What changes were proposed in this pull request?

In order to support nested predicate subquery, this PR introduce an internal join type ExistenceJoin, which will emit all the rows from left, plus an additional column, which presents there are any rows matched from right or not (it's not null-aware right now). This additional column could be used to replace the subquery in Filter.

In theory, all the predicate subquery could use this join type, but it's slower than LeftSemi and LeftAnti, so it's only used for nested subquery (subquery inside OR).

For example, the following SQL:
```sql
SELECT a FROM t  WHERE EXISTS (select 0) OR EXISTS (select 1)
```

This PR also fix a bug in predicate subquery push down through join (they should not).

Nested null-aware subquery is still not supported. For example,   `a > 3 OR b NOT IN (select bb from t)`

After this, we could run TPCDS query Q10, Q35, Q45

## How was this patch tested?

Added unit tests.

Author: Davies Liu <davies@databricks.com>

Closes #12820 from davies/or_exists.
2016-05-02 12:58:59 -07:00
Dongjoon Hyun 6e6320122e [SPARK-14830][SQL] Add RemoveRepetitionFromGroupExpressions optimizer.
## What changes were proposed in this pull request?

This PR aims to optimize GroupExpressions by removing repeating expressions. `RemoveRepetitionFromGroupExpressions` is added.

**Before**
```scala
scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain()
== Physical Plan ==
WholeStageCodegen
:  +- TungstenAggregate(key=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9], functions=[], output=[(a + 1)#5])
:     +- INPUT
+- Exchange hashpartitioning((a#0 + 1)#6, (1 + a#0)#7, (A#0 + 1)#8, (1 + A#0)#9, 200), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6,(1 + a#0) AS (1 + a#0)#7,(A#0 + 1) AS (A#0 + 1)#8,(1 + A#0) AS (1 + A#0)#9], functions=[], output=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9])
      :     +- INPUT
      +- LocalTableScan [a#0], [[1],[2]]
```

**After**
```scala
scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain()
== Physical Plan ==
WholeStageCodegen
:  +- TungstenAggregate(key=[(a#0 + 1)#6], functions=[], output=[(a + 1)#5])
:     +- INPUT
+- Exchange hashpartitioning((a#0 + 1)#6, 200), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6], functions=[], output=[(a#0 + 1)#6])
      :     +- INPUT
      +- LocalTableScan [a#0], [[1],[2]]
```

## How was this patch tested?

Pass the Jenkins tests (with a new testcase)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12590 from dongjoon-hyun/SPARK-14830.
2016-05-02 12:40:21 -07:00
Andrew Ray 9927441868 [SPARK-13749][SQL] Faster pivot implementation for many distinct values with two phase aggregation
## What changes were proposed in this pull request?

The existing implementation of pivot translates into a single aggregation with one aggregate per distinct pivot value. When the number of distinct pivot values is large (say 1000+) this can get extremely slow since each input value gets evaluated on every aggregate even though it only affects the value of one of them.

I'm proposing an alternate strategy for when there are 10+ (somewhat arbitrary threshold) distinct pivot values. We do two phases of aggregation. In the first we group by the grouping columns plus the pivot column and perform the specified aggregations (one or sometimes more). In the second aggregation we group by the grouping columns and use the new (non public) PivotFirst aggregate that rearranges the outputs of the first aggregation into an array indexed by the pivot value. Finally we do a project to extract the array entries into the appropriate output column.

## How was this patch tested?

Additional unit tests in DataFramePivotSuite and manual larger scale testing.

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #11583 from aray/fast-pivot.
2016-05-02 11:12:55 -07:00
Wenchen Fan 0513c3ac93 [SPARK-14637][SQL] object expressions cleanup
## What changes were proposed in this pull request?

Simplify and clean up some object expressions:

1. simplify the logic to handle `propagateNull`
2. add `propagateNull` parameter to `Invoke`
3. simplify the unbox logic in `Invoke`
4. other minor cleanup

TODO: simplify `MapObjects`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12399 from cloud-fan/object.
2016-05-02 10:21:14 -07:00
Yin Huai 0182d9599d [SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse location
This PR contains three changes:
1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir.
2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table in SparkSession without hive support, the default db's path will be an empty string.
3. When we create a database, we need to make the path qualified.

Existing tests and new tests

Author: Yin Huai <yhuai@databricks.com>

Closes #12812 from yhuai/warehouse.
2016-04-30 18:04:42 -07:00
Wenchen Fan 43b149fb88 [SPARK-14850][ML] convert primitive array from/to unsafe array directly in VectorUDT/MatrixUDT
## What changes were proposed in this pull request?

This PR adds `fromPrimitiveArray` and `toPrimitiveArray` in `UnsafeArrayData`, so that we can do the conversion much faster in VectorUDT/MatrixUDT.

## How was this patch tested?

existing tests and new test suite `UnsafeArraySuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12640 from cloud-fan/ml.
2016-04-29 23:04:51 -07:00
Yin Huai ac41fc648d [SPARK-14591][SQL] Remove DataTypeParser and add more keywords to the nonReserved list.
## What changes were proposed in this pull request?
CatalystSqlParser can parse data types. So, we do not need to have an individual DataTypeParser.

## How was this patch tested?
Existing tests

Author: Yin Huai <yhuai@databricks.com>

Closes #12796 from yhuai/removeDataTypeParser.
2016-04-29 22:49:12 -07:00
Reynold Xin 7945f9f6d4 [SPARK-14757] [SQL] Fix nullability bug in EqualNullSafe codegen
## What changes were proposed in this pull request?
This patch fixes a null handling bug in EqualNullSafe's code generation.

## How was this patch tested?
Updated unit test so they would fail without the fix.

Closes #12628.

Author: Reynold Xin <rxin@databricks.com>
Author: Arash Nabili <arash@levyx.com>

Closes #12799 from rxin/equalnullsafe.
2016-04-29 22:26:12 -07:00
Herman van Hovell 83061be697 [SPARK-14858] [SQL] Enable subquery pushdown
The previous subquery PRs did not include support for pushing subqueries used in filters (`WHERE`/`HAVING`) down. This PR adds this support. For example :
```scala
range(0, 10).registerTempTable("a")
range(5, 15).registerTempTable("b")
range(7, 25).registerTempTable("c")
range(3, 12).registerTempTable("d")
val plan = sql("select * from a join b on a.id = b.id left join c on c.id = b.id where a.id in (select id from d)")
plan.explain(true)
```
Leads to the following Analyzed & Optimized plans:
```
== Parsed Logical Plan ==
...

== Analyzed Logical Plan ==
id: bigint, id: bigint, id: bigint
Project [id#0L,id#4L,id#8L]
+- Filter predicate-subquery#16 [(id#0L = id#12L)]
   :  +- SubqueryAlias predicate-subquery#16 [(id#0L = id#12L)]
   :     +- Project [id#12L]
   :        +- SubqueryAlias d
   :           +- Range 3, 12, 1, 8, [id#12L]
   +- Join LeftOuter, Some((id#8L = id#4L))
      :- Join Inner, Some((id#0L = id#4L))
      :  :- SubqueryAlias a
      :  :  +- Range 0, 10, 1, 8, [id#0L]
      :  +- SubqueryAlias b
      :     +- Range 5, 15, 1, 8, [id#4L]
      +- SubqueryAlias c
         +- Range 7, 25, 1, 8, [id#8L]

== Optimized Logical Plan ==
Join LeftOuter, Some((id#8L = id#4L))
:- Join Inner, Some((id#0L = id#4L))
:  :- Join LeftSemi, Some((id#0L = id#12L))
:  :  :- Range 0, 10, 1, 8, [id#0L]
:  :  +- Range 3, 12, 1, 8, [id#12L]
:  +- Range 5, 15, 1, 8, [id#4L]
+- Range 7, 25, 1, 8, [id#8L]

== Physical Plan ==
...
```
I have also taken the opportunity to move quite a bit of code around:
- Rewriting subqueris and pulling out correlated predicated from subqueries has been moved into the analyzer. The analyzer transforms `Exists` and `InSubQuery` into `PredicateSubquery` expressions. A PredicateSubquery exposes the 'join' expressions and the proper references. This makes things like type coercion, optimization and planning easier to do.
- I have added support for `Aggregate` plans in subqueries. Any correlated expressions will be added to the grouping expressions. I have removed support for `Union` plans, since pulling in an outer reference from beneath a Union has no value (a filtered value could easily be part of another Union child).
- Resolution of subqueries is now done using `OuterReference`s. These are used to wrap any outer reference; this makes the identification of these references easier, and also makes dealing with duplicate attributes in the outer and inner plans easier. The resolution of subqueries initially used a resolution loop which would alternate between calling the analyzer and trying to resolve the outer references. We now use a dedicated analyzer which uses a special rule for outer reference resolution.

These changes are a stepping stone for enabling correlated scalar subqueries, enabling all Hive tests & allowing us to use predicate subqueries anywhere.

Current tests and added test cases in FilterPushdownSuite.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12720 from hvanhovell/SPARK-14858.
2016-04-29 16:50:12 -07:00
Sun Rui 4ae9fe091c [SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR.
## What changes were proposed in this pull request?

dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame.

The function signature is:

	dapply(df, function(localDF) {}, schema = NULL)

R function input: local data.frame from the partition on local node
R function output: local data.frame

Schema specifies the Row format of the resulting DataFrame. It must match the R function's output.
If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply().

## How was this patch tested?
SparkR unit tests.

Author: Sun Rui <rui.sun@intel.com>
Author: Sun Rui <sunrui2016@gmail.com>

Closes #12493 from sun-rui/SPARK-12919.
2016-04-29 16:41:07 -07:00
Reynold Xin 054f991c43 [SPARK-14994][SQL] Remove execution hive from HiveSessionState
## What changes were proposed in this pull request?
This patch removes executionHive from HiveSessionState and HiveSharedState.

## How was this patch tested?
Updated test cases.

Author: Reynold Xin <rxin@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12770 from rxin/SPARK-14994.
2016-04-29 01:14:02 -07:00
gatorsmile 222dcf7937 [SPARK-12660][SPARK-14967][SQL] Implement Except Distinct by Left Anti Join
#### What changes were proposed in this pull request?
Replaces a logical `Except` operator with a `Left-anti Join` operator. This way, we can take advantage of all the benefits of join implementations (e.g. managed memory, code generation, broadcast joins).
```SQL
  SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
  ==>  SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
```
 Note:
 1. This rule is only applicable to EXCEPT DISTINCT. Do not use it for EXCEPT ALL.
 2. This rule has to be done after de-duplicating the attributes; otherwise, the enerated
    join conditions will be incorrect.

This PR also corrects the existing behavior in Spark. Before this PR, the behavior is like
```SQL
  test("except") {
    val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
    val df_right = Seq(1, 3).toDF("id")

    checkAnswer(
      df_left.except(df_right),
      Row(2) :: Row(2) :: Row(4) :: Nil
    )
  }
```
After this PR, the result is corrected. We strictly follow the SQL compliance of `Except Distinct`.

#### How was this patch tested?
Modified and added a few test cases to verify the optimization rule and the results of operators.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12736 from gatorsmile/exceptByAntiJoin.
2016-04-29 15:30:36 +08:00
Reynold Xin 4607f6e7f7 [SPARK-14991][SQL] Remove HiveNativeCommand
## What changes were proposed in this pull request?
This patch removes HiveNativeCommand, so we can continue to remove the dependency on Hive. This pull request also removes the ability to generate golden result file using Hive.

## How was this patch tested?
Updated tests to reflect this.

Author: Reynold Xin <rxin@databricks.com>

Closes #12769 from rxin/SPARK-14991.
2016-04-28 21:58:48 -07:00
Gregory Hart 12c360c057 [SPARK-14965][SQL] Indicate an exception is thrown for a missing struct field
## What changes were proposed in this pull request?

Fix to ScalaDoc for StructType.

## How was this patch tested?

Built locally.

Author: Gregory Hart <greg.hart@thinkbiganalytics.com>

Closes #12758 from freastro/hotfix/SPARK-14965.
2016-04-28 11:21:43 -07:00
Liang-Chi Hsieh 7c6937a885 [SPARK-14487][SQL] User Defined Type registration without SQLUserDefinedType annotation
## What changes were proposed in this pull request?

Currently we use `SQLUserDefinedType` annotation to register UDTs for user classes. However, by doing this, we add Spark dependency to user classes.

For some user classes, it is unnecessary to add such dependency that will increase deployment difficulty.

We should provide alternative approach to register UDTs for user classes without `SQLUserDefinedType` annotation.

## How was this patch tested?

`UserDefinedTypeSuite`

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #12259 from viirya/improve-sql-usertype.
2016-04-28 01:14:49 -07:00
Andrew Or 37575115b9 [SPARK-14940][SQL] Move ExternalCatalog to own file
## What changes were proposed in this pull request?

`interfaces.scala` was getting big. This just moves the biggest class in there to a new file for cleanliness.

## How was this patch tested?

Just moving things around.

Author: Andrew Or <andrew@databricks.com>

Closes #12721 from andrewor14/move-external-catalog.
2016-04-27 14:17:36 -07:00
Cheng Lian 24bea00047 [SPARK-14954] [SQL] Add PARTITION BY and BUCKET BY clause for data source CTAS syntax
Currently, we can only create persisted partitioned and/or bucketed data source tables using the Dataset API but not using SQL DDL. This PR implements the following syntax to add partitioning and bucketing support to the SQL DDL:

```
CREATE TABLE <table-name>
USING <provider> [OPTIONS (<key1> <value1>, <key2> <value2>, ...)]
[PARTITIONED BY (col1, col2, ...)]
[CLUSTERED BY (col1, col2, ...) [SORTED BY (col1, col2, ...)] INTO <n> BUCKETS]
AS SELECT ...
```

Test cases are added in `MetastoreDataSourcesSuite` to check the newly added syntax.

Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12734 from liancheng/spark-14954.
2016-04-27 13:55:13 -07:00
Dongjoon Hyun af92299fdb [SPARK-14664][SQL] Implement DecimalAggregates optimization for Window queries
## What changes were proposed in this pull request?

This PR aims to implement decimal aggregation optimization for window queries by improving existing `DecimalAggregates`. Historically, `DecimalAggregates` optimizer is designed to transform general `sum/avg(decimal)`, but it breaks recently added windows queries like the followings. The following queries work well without the current `DecimalAggregates` optimizer.

**Sum**
```scala
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").head
java.lang.RuntimeException: Unsupported window function: MakeDecimal((sum(UnscaledValue(a#31)),mode=Complete,isDistinct=false),12,1)
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23]
:     +- INPUT
+- Window [MakeDecimal((sum(UnscaledValue(a#21)),mode=Complete,isDistinct=false),12,1) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#21]
         +- Scan OneRowRelation[]
```

**Average**
```scala
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").head
java.lang.RuntimeException: Unsupported window function: cast(((avg(UnscaledValue(a#40)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44]
:     +- INPUT
+- Window [cast(((avg(UnscaledValue(a#42)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5)) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#42]
         +- Scan OneRowRelation[]
```

After this PR, those queries work fine and new optimized physical plans look like the followings.

**Sum**
```scala
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35]
:     +- INPUT
+- Window [MakeDecimal((sum(UnscaledValue(a#33)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),12,1) AS sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#33]
         +- Scan OneRowRelation[]
```

**Average**
```scala
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47]
:     +- INPUT
+- Window [cast(((avg(UnscaledValue(a#45)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) / 10.0) as decimal(6,5)) AS avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#45]
         +- Scan OneRowRelation[]
```

In this PR, *SUM over window* pattern matching is based on the code of hvanhovell ; he should be credited for the work he did.

## How was this patch tested?

Pass the Jenkins tests (with newly added testcases)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12421 from dongjoon-hyun/SPARK-14664.
2016-04-27 21:36:19 +02:00
Yin Huai 54a3eb8312 [SPARK-14130][SQL] Throw exceptions for ALTER TABLE ADD/REPLACE/CHANGE COLUMN, ALTER TABLE SET FILEFORMAT, DFS, and transaction related commands
## What changes were proposed in this pull request?
This PR will make Spark SQL not allow ALTER TABLE ADD/REPLACE/CHANGE COLUMN, ALTER TABLE SET FILEFORMAT, DFS, and transaction related commands.

## How was this patch tested?
Existing tests. For those tests that I put in the blacklist, I am adding the useful parts back to SQLQuerySuite.

Author: Yin Huai <yhuai@databricks.com>

Closes #12714 from yhuai/banNativeCommand.
2016-04-27 00:30:54 -07:00