Commit graph

1473 commits

Author SHA1 Message Date
福星 16293311cd [SPARK-18237][HIVE] hive.exec.stagingdir have no effect
hive.exec.stagingdir have no effect in spark2.0.1,
Hive confs in hive-site.xml will be loaded in `hadoopConf`, so we should use `hadoopConf` in `InsertIntoHiveTable` instead of `SessionState.conf`

Author: 福星 <fuxing@wacai.com>

Closes #15744 from ClassNotFoundExp/master.
2016-11-03 12:02:01 -07:00
Reynold Xin b17057c0a6 [SPARK-18244][SQL] Rename partitionProviderIsHive -> tracksPartitionsInCatalog
## What changes were proposed in this pull request?
This patch renames partitionProviderIsHive to tracksPartitionsInCatalog, as the old name was too Hive specific.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #15750 from rxin/SPARK-18244.
2016-11-03 11:48:05 -07:00
Cheng Lian 27daf6bcde [SPARK-17949][SQL] A JVM object based aggregate operator
## What changes were proposed in this pull request?

This PR adds a new hash-based aggregate operator named `ObjectHashAggregateExec` that supports `TypedImperativeAggregate`, which may use arbitrary Java objects as aggregation states. Please refer to the [design doc](https://issues.apache.org/jira/secure/attachment/12834260/%5BDesign%20Doc%5D%20Support%20for%20Arbitrary%20Aggregation%20States.pdf) attached in [SPARK-17949](https://issues.apache.org/jira/browse/SPARK-17949) for more details about it.

The major benefit of this operator is better performance when evaluating `TypedImperativeAggregate` functions, especially when there are relatively few distinct groups. Functions like Hive UDAFs, `collect_list`, and `collect_set` may also benefit from this after being migrated to `TypedImperativeAggregate`.

The following feature flag is introduced to enable or disable the new aggregate operator:
- Name: `spark.sql.execution.useObjectHashAggregateExec`
- Default value: `true`

We can also configure the fallback threshold using the following SQL operation:
- Name: `spark.sql.objectHashAggregate.sortBased.fallbackThreshold`
- Default value: 128

  Fallback to sort-based aggregation when more than 128 distinct groups are accumulated in the aggregation hash map. This number is intentionally made small to avoid GC problems since aggregation buffers of this operator may contain arbitrary Java objects.

  This may be improved by implementing size tracking for this operator, but that can be done in a separate PR.

Code generation and size tracking are planned to be implemented in follow-up PRs.
## Benchmark results
### `ObjectHashAggregateExec` vs `SortAggregateExec`

The first benchmark compares `ObjectHashAggregateExec` and `SortAggregateExec` by evaluating `typed_count`, a testing `TypedImperativeAggregate` version of the SQL `count` function.

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
sort agg w/ group by                        31251 / 31908          3.4         298.0       1.0X
object agg w/ group by w/o fallback           6903 / 7141         15.2          65.8       4.5X
object agg w/ group by w/ fallback          20945 / 21613          5.0         199.7       1.5X
sort agg w/o group by                         4734 / 5463         22.1          45.2       6.6X
object agg w/o group by w/o fallback          4310 / 4529         24.3          41.1       7.3X
```

The next benchmark compares `ObjectHashAggregateExec` and `SortAggregateExec` by evaluating the Spark native version of `percentile_approx`.

Note that `percentile_approx` is so heavy an aggregate function that the bottleneck of the benchmark is evaluating the aggregate function itself rather than the aggregate operator since I couldn't run a large scale benchmark on my laptop. That's why the results are so close and looks counter-intuitive (aggregation with grouping is even faster than that aggregation without grouping).

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
sort agg w/ group by                          3418 / 3530          0.6        1630.0       1.0X
object agg w/ group by w/o fallback           3210 / 3314          0.7        1530.7       1.1X
object agg w/ group by w/ fallback            3419 / 3511          0.6        1630.1       1.0X
sort agg w/o group by                         4336 / 4499          0.5        2067.3       0.8X
object agg w/o group by w/o fallback          4271 / 4372          0.5        2036.7       0.8X
```
### Hive UDAF vs Spark AF

This benchmark compares the following two kinds of aggregate functions:
- "hive udaf": Hive implementation of `percentile_approx`, without partial aggregation supports, evaluated using `SortAggregateExec`.
- "spark af": Spark native implementation of `percentile_approx`, with partial aggregation support, evaluated using `ObjectHashAggregateExec`

The performance differences are mostly due to faster implementation and partial aggregation support in the Spark native version of `percentile_approx`.

This benchmark basically shows the performance differences between the worst case, where an aggregate function without partial aggregation support is evaluated using `SortAggregateExec`, and the best case, where a `TypedImperativeAggregate` with partial aggregation support is evaluated using `ObjectHashAggregateExec`.

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hive udaf w/o group by                        5326 / 5408          0.0       81264.2       1.0X
spark af w/o group by                           93 /  111          0.7        1415.6      57.4X
hive udaf w/ group by                         3804 / 3946          0.0       58050.1       1.4X
spark af w/ group by w/o fallback               71 /   90          0.9        1085.7      74.8X
spark af w/ group by w/ fallback                98 /  111          0.7        1501.6      54.1X
```
### Real world benchmark

We also did a relatively large benchmark using a real world query involving `percentile_approx`:
- Hive UDAF implementation, sort-based aggregation, w/o partial aggregation support

  24.77 minutes
- Native implementation, sort-based aggregation, w/ partial aggregation support

  4.64 minutes
- Native implementation, object hash aggregator, w/ partial aggregation support

  1.80 minutes
## How was this patch tested?

New unit tests and randomized test cases are added in `ObjectAggregateFunctionSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #15590 from liancheng/obj-hash-agg.
2016-11-03 09:34:51 -07:00
Reynold Xin 0ea5d5b24c [SQL] minor - internal doc improvement for InsertIntoTable.
## What changes were proposed in this pull request?
I was reading this part of the code and was really confused by the "partition" parameter. This patch adds some documentation for it to reduce confusion in the future.

I also looked around other logical plans but most of them are either already documented, or pretty self-evident to people that know Spark SQL.

## How was this patch tested?
N/A - doc change only.

Author: Reynold Xin <rxin@databricks.com>

Closes #15749 from rxin/doc-improvement.
2016-11-03 02:45:54 -07:00
hyukjinkwon 7eb2ca8e33 [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
## What changes were proposed in this pull request?

This PR proposes to change the documentation for functions. Please refer the discussion from https://github.com/apache/spark/pull/15513

The changes include
- Re-indent the documentation
- Add examples/arguments in `extended` where the arguments are multiple or specific format (e.g. xml/ json).

For examples, the documentation was updated as below:
### Functions with single line usage

**Before**
- `pow`

  ``` sql
  Usage: pow(x1, x2) - Raise x1 to the power of x2.
  Extended Usage:
  > SELECT pow(2, 3);
   8.0
  ```
- `current_timestamp`

  ``` sql
  Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation.
  Extended Usage:
  No example for current_timestamp.
  ```

**After**
- `pow`

  ``` sql
  Usage: pow(expr1, expr2) - Raises `expr1` to the power of `expr2`.
  Extended Usage:
      Examples:
        > SELECT pow(2, 3);
         8.0
  ```

- `current_timestamp`

  ``` sql
  Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation.
  Extended Usage:
      No example/argument for current_timestamp.
  ```
### Functions with (already) multiple line usage

**Before**
- `approx_count_distinct`

  ``` sql
  Usage: approx_count_distinct(expr) - Returns the estimated cardinality by HyperLogLog++.
      approx_count_distinct(expr, relativeSD=0.05) - Returns the estimated cardinality by HyperLogLog++
        with relativeSD, the maximum estimation error allowed.

  Extended Usage:
  No example for approx_count_distinct.
  ```
- `percentile_approx`

  ``` sql
  Usage:
        percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric
        column `col` at the given percentage. The value of percentage must be between 0.0
        and 1.0. The `accuracy` parameter (default: 10000) is a positive integer literal which
        controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields
        better accuracy, `1.0/accuracy` is the relative error of the approximation.

        percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) - Returns the approximate
        percentile array of column `col` at the given percentage array. Each value of the
        percentage array must be between 0.0 and 1.0. The `accuracy` parameter (default: 10000) is
        a positive integer literal which controls approximation accuracy at the cost of memory.
        Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of
        the approximation.

  Extended Usage:
  No example for percentile_approx.
  ```

**After**
- `approx_count_distinct`

  ``` sql
  Usage:
      approx_count_distinct(expr[, relativeSD]) - Returns the estimated cardinality by HyperLogLog++.
        `relativeSD` defines the maximum estimation error allowed.

  Extended Usage:
      No example/argument for approx_count_distinct.
  ```

- `percentile_approx`

  ``` sql
  Usage:
      percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric
        column `col` at the given percentage. The value of percentage must be between 0.0
        and 1.0. The `accuracy` parameter (default: 10000) is a positive numeric literal which
        controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields
        better accuracy, `1.0/accuracy` is the relative error of the approximation.
        When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0.
        In this case, returns the approximate percentile array of column `col` at the given
        percentage array.

  Extended Usage:
      Examples:
        > SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100);
         [10.0,10.0,10.0]
        > SELECT percentile_approx(10.0, 0.5, 100);
         10.0
  ```
## How was this patch tested?

Manually tested

**When examples are multiple**

``` sql
spark-sql> describe function extended reflect;
Function: reflect
Class: org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection
Usage: reflect(class, method[, arg1[, arg2 ..]]) - Calls a method with reflection.
Extended Usage:
    Examples:
      > SELECT reflect('java.util.UUID', 'randomUUID');
       c33fb387-8500-4bfa-81d2-6e0e3e930df2
      > SELECT reflect('java.util.UUID', 'fromString', 'a5cf6c42-0c85-418f-af6c-3e4e5b1328f2');
       a5cf6c42-0c85-418f-af6c-3e4e5b1328f2
```

**When `Usage` is in single line**

``` sql
spark-sql> describe function extended min;
Function: min
Class: org.apache.spark.sql.catalyst.expressions.aggregate.Min
Usage: min(expr) - Returns the minimum value of `expr`.
Extended Usage:
    No example/argument for min.
```

**When `Usage` is already in multiple lines**

``` sql
spark-sql> describe function extended percentile_approx;
Function: percentile_approx
Class: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
Usage:
    percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric
      column `col` at the given percentage. The value of percentage must be between 0.0
      and 1.0. The `accuracy` parameter (default: 10000) is a positive numeric literal which
      controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields
      better accuracy, `1.0/accuracy` is the relative error of the approximation.
      When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0.
      In this case, returns the approximate percentile array of column `col` at the given
      percentage array.

Extended Usage:
    Examples:
      > SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100);
       [10.0,10.0,10.0]
      > SELECT percentile_approx(10.0, 0.5, 100);
       10.0
```

**When example/argument is missing**

``` sql
spark-sql> describe function extended rank;
Function: rank
Class: org.apache.spark.sql.catalyst.expressions.Rank
Usage:
    rank() - Computes the rank of a value in a group of values. The result is one plus the number
      of rows preceding or equal to the current row in the ordering of the partition. The values
      will produce gaps in the sequence.

Extended Usage:
    No example/argument for rank.
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15677 from HyukjinKwon/SPARK-17963-1.
2016-11-02 20:56:30 -07:00
Wenchen Fan 3a1bc6f478 [SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table
## What changes were proposed in this pull request?

Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties.

This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field.

This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog.

For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm.
For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`.

To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options.
## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15024 from cloud-fan/path.
2016-11-02 18:05:14 -07:00
Xiangrui Meng 02f203107b [SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union
## What changes were proposed in this pull request?

When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following:
- The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations.

However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column.

See the unit tests below or JIRA for examples.

This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization.
## How was this patch tested?

Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...)

cc: rxin davies

Author: Xiangrui Meng <meng@databricks.com>

Closes #15567 from mengxr/SPARK-14393.
2016-11-02 11:41:49 -07:00
eyal farago f151bd1af8 [SPARK-16839][SQL] Simplify Struct creation code path
## What changes were proposed in this pull request?

Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`.

This PR includes:

1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`).
2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees.
3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`.
4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved.
5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns.

## How was this patch tested?
Running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully.

Modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`.

Author: eyal farago <eyal farago>
Author: Herman van Hovell <hvanhovell@databricks.com>
Author: eyal farago <eyal.farago@gmail.com>
Author: Eyal Farago <eyal.farago@actimize.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Author: eyalfa <eyal.farago@gmail.com>

Closes #15718 from hvanhovell/SPARK-16839-2.
2016-11-02 11:12:20 +01:00
Sean Owen 9c8deef64e
[SPARK-18076][CORE][SQL] Fix default Locale used in DateFormat, NumberFormat to Locale.US
## What changes were proposed in this pull request?

Fix `Locale.US` for all usages of `DateFormat`, `NumberFormat`
## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #15610 from srowen/SPARK-18076.
2016-11-02 09:39:15 +00:00
Eric Liang abefe2ec42 [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables
## What changes were proposed in this pull request?

There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive.

(1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition.
(2) INSERT|OVERWRITE does not work with partitions that have custom locations.

This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged.

There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release.

## How was this patch tested?

Unit tests.

Author: Eric Liang <ekl@databricks.com>

Closes #15705 from ericl/sc-4942.
2016-11-02 14:15:10 +08:00
Michael Allman 1bbf9ff634 [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992)
## What changes were proposed in this pull request?

We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`.

I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail.

Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored.
## How was this patch tested?

A unit test was added.

Author: Michael Allman <michael@videoamp.com>

Closes #15673 from mallman/spark-17992-catch_hive_partition_filter_exception.
2016-11-01 22:20:19 -07:00
Eric Liang cfac17ee1c [SPARK-18167] Disable flaky SQLQuerySuite test
We now know it's a persistent environmental issue that is causing this test to sometimes fail. One hypothesis is that some configuration is leaked from another suite, and depending on suite ordering this can cause this test to fail.

I am planning on mining the jenkins logs to try to narrow down which suite could be causing this. For now, disable the test.

Author: Eric Liang <ekl@databricks.com>

Closes #15720 from ericl/disable-flaky-test.
2016-11-01 12:35:34 -07:00
Herman van Hovell 0cba535af3 Revert "[SPARK-16839][SQL] redundant aliases after cleanupAliases"
This reverts commit 5441a6269e.
2016-11-01 17:30:37 +01:00
eyal farago 5441a6269e [SPARK-16839][SQL] redundant aliases after cleanupAliases
## What changes were proposed in this pull request?

Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`.

This PR includes:

1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`).
2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees.
3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`.
4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved.
5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns.

## How was this patch tested?

running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully.

modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`.

Credit goes to hvanhovell for assisting with this PR.

Author: eyal farago <eyal farago>
Author: eyal farago <eyal.farago@gmail.com>
Author: Herman van Hovell <hvanhovell@databricks.com>
Author: Eyal Farago <eyal.farago@actimize.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Author: eyalfa <eyal.farago@gmail.com>

Closes #14444 from eyalfa/SPARK-16839_redundant_aliases_after_cleanupAliases.
2016-11-01 17:12:20 +01:00
Liang-Chi Hsieh dd85eb5448 [SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql than it does in hive-client
## What changes were proposed in this pull request?

As reported on the jira, insert overwrite statement runs much slower in Spark, compared with hive-client.

It seems there is a patch [HIVE-11940](ba21806b77) which largely improves insert overwrite performance on Hive. HIVE-11940 is patched after Hive 2.0.0.

Because Spark SQL uses older Hive library, we can not benefit from such improvement.

The reporter verified that there is also a big performance gap between Hive 1.2.1 (520.037 secs) and Hive 2.0.1 (35.975 secs) on insert overwrite execution.

Instead of upgrading to Hive 2.0 in Spark SQL, which might not be a trivial task, this patch provides an approach to delete the partition before asking Hive to load data files into the partition.

Note: The case reported on the jira is insert overwrite to partition. Since `Hive.loadTable` also uses the function to replace files, insert overwrite to table should has the same issue. We can take the same approach to delete the table first. I will upgrade this to include this.
## How was this patch tested?

Jenkins tests.

There are existing tests using insert overwrite statement. Those tests should be passed. I added a new test to specially test insert overwrite into partition.

For performance issue, as I don't have Hive 2.0 environment, this needs the reporter to verify it. Please refer to the jira.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15667 from viirya/improve-hive-insertoverwrite.
2016-11-01 00:24:08 -07:00
Reynold Xin d9d1465009 [SPARK-18024][SQL] Introduce an internal commit protocol API
## What changes were proposed in this pull request?
This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits.

## How was this patch tested?
Should be covered by existing write tests.

Author: Reynold Xin <rxin@databricks.com>
Author: Eric Liang <ekl@databricks.com>

Closes #15707 from rxin/SPARK-18024-2.
2016-10-31 22:23:38 -07:00
Eric Liang 7d6c87155c [SPARK-18167][SQL] Retry when the SQLQuerySuite test flakes
## What changes were proposed in this pull request?

This will re-run the flaky test a few times after it fails. This will help determine if it's due to nondeterministic test setup, or because of some environment issue (e.g. leaked config from another test).

cc yhuai

Author: Eric Liang <ekl@databricks.com>

Closes #15708 from ericl/spark-18167-3.
2016-10-31 20:23:22 -07:00
Eric Liang 6633b97b57 [SPARK-18167][SQL] Also log all partitions when the SQLQuerySuite test flakes
## What changes were proposed in this pull request?

One possibility for this test flaking is that we have corrupted the partition schema somehow in the tests, which causes the cast to decimal to fail in the call. This should at least show us the actual partition values.

## How was this patch tested?

Run it locally, it prints out something like `ArrayBuffer(test(partcol=0), test(partcol=1), test(partcol=2), test(partcol=3), test(partcol=4))`.

Author: Eric Liang <ekl@databricks.com>

Closes #15701 from ericl/print-more-info.
2016-10-31 16:26:52 -07:00
Eric Liang 90d3b91f4c [SPARK-18103][SQL] Rename *FileCatalog to *FileIndex
## What changes were proposed in this pull request?

To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression.

```
TableFileCatalog => CatalogFileIndex
FileCatalog => FileIndex
ListingFileCatalog => InMemoryFileIndex
MetadataLogFileCatalog => MetadataLogFileIndex
PrunedTableFileCatalog => PrunedInMemoryFileIndex
```

cc yhuai marmbrus

## How was this patch tested?

N/A

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #15634 from ericl/rename-file-provider.
2016-10-30 13:14:45 -07:00
Eric Liang d2d438d1d5 [SPARK-18167][SQL] Add debug code for SQLQuerySuite flakiness when metastore partition pruning is enabled
## What changes were proposed in this pull request?

org.apache.spark.sql.hive.execution.SQLQuerySuite is flaking when hive partition pruning is enabled.
Based on the stack traces, it seems to be an old issue where Hive fails to cast a numeric partition column ("Invalid character string format for type DECIMAL"). There are two possibilities here: either we are somehow corrupting the partition table to have non-decimal values in that column, or there is a transient issue with Derby.

This PR logs the result of the retry when this exception is encountered, so we can confirm what is going on.

## How was this patch tested?

n/a

cc yhuai

Author: Eric Liang <ekl@databricks.com>

Closes #15676 from ericl/spark-18167.
2016-10-29 06:49:57 +02:00
Sunitha Kambhampati ab5f938bc7 [SPARK-18121][SQL] Unable to query global temp views when hive support is enabled
## What changes were proposed in this pull request?

Issue:
Querying on a global temp view throws Table or view not found exception.

Fix:
Update the lookupRelation in HiveSessionCatalog to check for global temp views similar to the SessionCatalog.lookupRelation.

Before fix:
Querying on a global temp view ( for. e.g.:  select * from global_temp.v1)  throws Table or view not found exception

After fix:
Query succeeds and returns the right result.

## How was this patch tested?
- Two unit tests are added to check for global temp view for the code path when hive support is enabled.
- Regression unit tests were run successfully. ( build/sbt -Phive hive/test, build/sbt sql/test, build/sbt catalyst/test)

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #15649 from skambha/lookuprelationChanges.
2016-10-28 08:39:02 +08:00
Eric Liang ccb1154304 [SPARK-17970][SQL] store partition spec in metastore for data source table
## What changes were proposed in this pull request?

We should follow hive table and also store partition spec in metastore for data source table.
This brings 2 benefits:

1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION`
2. We don't need to cache all file status for data source table anymore.

## How was this patch tested?

existing tests.

Author: Eric Liang <ekl@databricks.com>
Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekhliang@gmail.com>
Author: Wenchen Fan <wenchen@databricks.com>

Closes #15515 from cloud-fan/partition.
2016-10-27 14:22:30 -07:00
Wenchen Fan 6f31833dbe [SPARK-18026][SQL] should not always lowercase partition columns of partition spec in parser
## What changes were proposed in this pull request?

Currently we always lowercase the partition columns of partition spec in parser, with the assumption that table partition columns are always lowercased.

However, this is not true for data source tables, which are case preserving. It's safe for now because data source tables don't store partition spec in metastore and don't support `ADD PARTITION`, `DROP PARTITION`, `RENAME PARTITION`, but we should make our code future-proof.

This PR makes partition spec case preserving at parser, and improve the `PreprocessTableInsertion` analyzer rule to normalize the partition columns in partition spec, w.r.t. the table partition columns.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15566 from cloud-fan/partition-spec.
2016-10-25 15:00:33 +08:00
gatorsmile d479c52622 [SPARK-17409][SQL][FOLLOW-UP] Do Not Optimize Query in CTAS More Than Once
### What changes were proposed in this pull request?
This follow-up PR is for addressing the [comment](https://github.com/apache/spark/pull/15048).

We added two test cases based on the suggestion from yhuai . One is a new test case using the `saveAsTable` API to create a data source table. Another is for CTAS on Hive serde table.

Note: No need to backport this PR to 2.0. Will submit a new PR to backport the whole fix with new test cases to Spark 2.0

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15459 from gatorsmile/ctasOptimizedTestCases.
2016-10-25 10:47:11 +08:00
Wenchen Fan 84a3399908 [SPARK-18028][SQL] simplify TableFileCatalog
## What changes were proposed in this pull request?

Simplify/cleanup TableFileCatalog:

1. pass a `CatalogTable` instead of `databaseName` and `tableName` into `TableFileCatalog`, so that we don't need to fetch table metadata from metastore again
2. In `TableFileCatalog.filterPartitions0`, DO NOT set `PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the [classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209), the default value of `basePath` already satisfies our need. What's more, if we set this parameter, we may break the case 2 which is metioned in the classdoc.
3. add `equals` and `hashCode` to `TableFileCatalog`
4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15568 from cloud-fan/table-file-catalog.
2016-10-25 08:42:21 +08:00
Sean Owen 4ecbe1b92f
[SPARK-17810][SQL] Default spark.sql.warehouse.dir is relative to local FS but can resolve as HDFS path
## What changes were proposed in this pull request?

Always resolve spark.sql.warehouse.dir as a local path, and as relative to working dir not home dir

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #15382 from srowen/SPARK-17810.
2016-10-24 10:44:45 +01:00
jiangxingbo b158256c2e [SPARK-18045][SQL][TESTS] Move HiveDataFrameAnalyticsSuite to package sql
## What changes were proposed in this pull request?

The testsuite `HiveDataFrameAnalyticsSuite` has nothing to do with HIVE, we should move it to package `sql`.
The original test cases in that suite are splited into two existing testsuites: `DataFrameAggregateSuite` tests for the functions and ~~`SQLQuerySuite`~~`SQLQueryTestSuite` tests for the SQL statements.

## How was this patch tested?
~~Modified `SQLQuerySuite` in package `sql`.~~
Add query file for `SQLQueryTestSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15582 from jiangxb1987/group-analytics-test.
2016-10-23 13:28:35 +02:00
Tejas Patil 21c7539a52 [SPARK-18038][SQL] Move output partitioning definition from UnaryNodeExec to its children
## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-18038

This was a suggestion by rxin over one of the dev list discussion : http://apache-spark-developers-list.1001551.n3.nabble.com/Project-not-preserving-child-partitioning-td19417.html

His words:

>> It would be better (safer) to move the output partitioning definition into each of the operator and remove it from UnaryExecNode.

With this PR, following is the output partitioning and ordering for all the impls of `UnaryExecNode`.

UnaryExecNode's impl | outputPartitioning | outputOrdering | comment
------------ | ------------- | ------------ | ------------
AppendColumnsExec | child's | Nil | child's ordering can be used
AppendColumnsWithObjectExec | child's | Nil | child's ordering can be used
BroadcastExchangeExec | BroadcastPartitioning | Nil | -
CoalesceExec | UnknownPartitioning | Nil | -
CollectLimitExec | SinglePartition | Nil | -
DebugExec | child's | Nil | child's ordering can be used
DeserializeToObjectExec | child's | Nil | child's ordering can be used
ExpandExec | UnknownPartitioning | Nil | -
FilterExec | child's | child's | -
FlatMapGroupsInRExec | child's | Nil | child's ordering can be used
GenerateExec | child's | Nil | need to dig more
GlobalLimitExec | child's | child's | -
HashAggregateExec | child's | Nil | -
InputAdapter | child's | child's | -
InsertIntoHiveTable | child's | Nil | terminal node, doesn't need partitioning
LocalLimitExec | child's | child's | -
MapElementsExec | child's | child's | -
MapGroupsExec | child's | Nil | child's ordering can be used
MapPartitionsExec | child's | Nil | child's ordering can be used
ProjectExec | child's | child's | -
SampleExec | child's | Nil | child's ordering can be used
ScriptTransformation | child's | Nil | child's ordering can be used
SerializeFromObjectExec | child's | Nil | child's ordering can be used
ShuffleExchange | custom | Nil | -
SortAggregateExec | child's | sort over grouped exprs | -
SortExec | child's | custom | -
StateStoreRestoreExec  | child's | Nil | child's ordering can be used
StateStoreSaveExec | child's | Nil | child's ordering can be used
SubqueryExec | child's | child's | -
TakeOrderedAndProjectExec | SinglePartition | custom | -
WholeStageCodegenExec | child's | child's | -
WindowExec | child's | child's | -

## How was this patch tested?

This does NOT change any existing functionality so relying on existing tests

Author: Tejas Patil <tejasp@fb.com>

Closes #15575 from tejasapatil/SPARK-18038_UnaryNodeExec_output_partitioning.
2016-10-23 13:25:47 +02:00
Tejas Patil eff4aed1ac [SPARK-18035][SQL] Introduce performant and memory efficient APIs to create ArrayBasedMapData
## What changes were proposed in this pull request?

Jira: https://issues.apache.org/jira/browse/SPARK-18035

In HiveInspectors, I saw that converting Java map to Spark's `ArrayBasedMapData` spent quite sometime in buffer copying : https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L658

The reason being `map.toSeq` allocates a new buffer and copies the map entries to it: https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/MapLike.scala#L323

This copy is not needed as we get rid of it once we extract the key and value arrays.

Here is the call trace:

```
org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41.apply(HiveInspectors.scala:664)
scala.collection.AbstractMap.toSeq(Map.scala:59)
scala.collection.MapLike$class.toSeq(MapLike.scala:323)
scala.collection.AbstractMap.toBuffer(Map.scala:59)
scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
```

Also, earlier code was populating keys and values arrays separately by iterating twice. The PR avoids double iteration of the map and does it in one iteration.

EDIT: During code review, there were several more places in the code which were found to do similar thing. The PR dedupes those instances and introduces convenient APIs which are performant and memory efficient

## Performance gains

The number is subjective and depends on how many map columns are accessed in the query and average entries per map. For one the queries that I tried out, I saw 3% CPU savings (end-to-end) for the query.

## How was this patch tested?

This does not change the end result produced so relying on existing tests.

Author: Tejas Patil <tejasp@fb.com>

Closes #15573 from tejasapatil/SPARK-18035_avoid_toSeq.
2016-10-22 20:43:43 -07:00
Eric Liang 3eca283aca [SPARK-17994][SQL] Add back a file status cache for catalog tables
## What changes were proposed in this pull request?

In SPARK-16980, we removed the full in-memory cache of table partitions in favor of loading only needed partitions from the metastore. This greatly improves the initial latency of queries that only read a small fraction of table partitions.

However, since the metastore does not store file statistics, we need to discover those from remote storage. With the loss of the in-memory file status cache this has to happen on each query, increasing the latency of repeated queries over the same partitions.

The proposal is to add back a per-table cache of partition contents, i.e. Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can be invalidated through refreshTable() and refreshByPath(). Unlike the prior cache, it can be incrementally updated as new partitions are read.

## How was this patch tested?

Existing tests and new tests in `HiveTablePerfStatsSuite`.

cc mallman

Author: Eric Liang <ekl@databricks.com>
Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #15539 from ericl/meta-cache.
2016-10-22 22:08:28 +08:00
Reynold Xin 3fbf5a58c2 [SPARK-18042][SQL] OutputWriter should expose file path written
## What changes were proposed in this pull request?
This patch adds a new "path" method on OutputWriter that returns the path of the file written by the OutputWriter. This is part of the necessary work to consolidate structured streaming and batch write paths.

The batch write path has a nice feature that each data source can define the extension of the files, and allow Spark to specify the staging directory and the prefix for the files. However, in the streaming path we need to collect the list of files written, and there is no interface right now to do that.

## How was this patch tested?
N/A - there is no behavior change and this should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #15580 from rxin/SPARK-18042.
2016-10-21 17:27:18 -07:00
Wenchen Fan 57e97fcbd6 [SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation
## What changes were proposed in this pull request?

In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it.

## How was this patch tested?

the new `PruneFileSourcePartitionsSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15569 from cloud-fan/partition-bug.
2016-10-21 12:27:53 +08:00
Reynold Xin 7f9ec19eae [SPARK-18021][SQL] Refactor file name specification for data sources
## What changes were proposed in this pull request?
Currently each data source OutputWriter is responsible for specifying the entire file name for each file output. This, however, does not make any sense because we rely on file naming schemes for certain behaviors in Spark SQL, e.g. bucket id. The current approach allows individual data sources to break the implementation of bucketing.

On the flip side, we also don't want to move file naming entirely out of data sources, because different data sources do want to specify different extensions.

This patch divides file name specification into two parts: the first part is a prefix specified by the caller of OutputWriter (in WriteOutput), and the second part is the suffix that can be specified by the OutputWriter itself. Note that a side effect of this change is that now all file based data sources also support bucketing automatically.

There are also some other minor cleanups:

- Removed the UUID passed through generic Configuration string
- Some minor rewrites for better clarity
- Renamed "path" in multiple places to "stagingDir", to more accurately reflect its meaning

## How was this patch tested?
This should be covered by existing data source tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #15562 from rxin/SPARK-18021.
2016-10-20 12:18:56 -07:00
Tejas Patil fb0894b3a8 [SPARK-17698][SQL] Join predicates should not contain filter clauses
## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17698

`ExtractEquiJoinKeys` is incorrectly using filter predicates as the join condition for joins. `canEvaluate` [0] tries to see if the an `Expression` can be evaluated using output of a given `Plan`. In case of filter predicates (eg. `a.id='1'`), the `Expression` passed for the right hand side (ie. '1' ) is a `Literal` which does not have any attribute references. Thus `expr.references` is an empty set which theoretically is a subset of any set. This leads to `canEvaluate` returning `true` and `a.id='1'` is treated as a join predicate. While this does not lead to incorrect results but in case of bucketed + sorted tables, we might miss out on avoiding un-necessary shuffle + sort. See example below:

[0] : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91

eg.

```
val df = (1 until 10).toDF("id").coalesce(1)
hc.sql("DROP TABLE IF EXISTS table1").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
hc.sql("DROP TABLE IF EXISTS table2").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")

sqlContext.sql("""
  SELECT a.id, b.id
  FROM table1 a
  FULL OUTER JOIN table2 b
  ON a.id = b.id AND a.id='1' AND b.id='1'
""").explain(true)
```

BEFORE: This is doing shuffle + sort over table scan outputs which is not needed as both tables are bucketed and sorted on the same columns and have same number of buckets. This should be a single stage job.

```
SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as double)], FullOuter
:- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
:     +- *FileScan parquet default.table1[id#38] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
      +- *FileScan parquet default.table2[id#39] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

AFTER :

```
SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0) && (cast(id#33 as double) = 1.0))
:- *FileScan parquet default.table1[id#32] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *FileScan parquet default.table2[id#33] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

## How was this patch tested?

- Added a new test case for this scenario : `SPARK-17698 Join predicates should not contain filter clauses`
- Ran all the tests in `BucketedReadSuite`

Author: Tejas Patil <tejasp@fb.com>

Closes #15272 from tejasapatil/SPARK-17698_join_predicate_filter_clause.
2016-10-20 09:50:55 -07:00
Dilip Biswal e895bc2548 [SPARK-17860][SQL] SHOW COLUMN's database conflict check should respect case sensitivity configuration
## What changes were proposed in this pull request?
SHOW COLUMNS command validates the user supplied database
name with database name from qualified table name name to make
sure both of them are consistent. This comparison should respect
case sensitivity.

## How was this patch tested?
Added tests in DDLSuite and existing tests were moved to use new sql based test infrastructure.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #15423 from dilipbiswal/dkb_show_column_fix.
2016-10-20 19:39:25 +08:00
Dongjoon Hyun 986a3b8b5b
[SPARK-17796][SQL] Support wildcard character in filename for LOAD DATA LOCAL INPATH
## What changes were proposed in this pull request?

Currently, Spark 2.0 raises an `input path does not exist` AnalysisException if the file name contains '*'. It is misleading since it occurs when there exist some matched files. Also, it was a supported feature in Spark 1.6.2. This PR aims to support wildcard characters in filename for `LOAD DATA LOCAL INPATH` SQL command like Spark 1.6.2.

**Reported Error Scenario**
```scala
scala> sql("CREATE TABLE t(a string)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("LOAD DATA LOCAL INPATH '/tmp/x*' INTO TABLE t")
org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: /tmp/x*;
```

## How was this patch tested?

Pass the Jenkins test with a new test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15376 from dongjoon-hyun/SPARK-17796.
2016-10-20 09:53:12 +01:00
Eric Liang 5f20ae0394 [SPARK-17980][SQL] Fix refreshByPath for converted Hive tables
## What changes were proposed in this pull request?

There was a bug introduced in https://github.com/apache/spark/pull/14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <ekl@databricks.com>

Closes #15521 from ericl/fix-caching.
2016-10-19 10:20:12 +08:00
Eric Liang 4ef39c2f44 [SPARK-17974] try 2) Refactor FileCatalog classes to simplify the inheritance tree
## What changes were proposed in this pull request?

This renames `BasicFileCatalog => FileCatalog`, combines  `SessionFileCatalog` with `PartitioningAwareFileCatalog`, and removes the old `FileCatalog` trait.

In summary,
```
MetadataLogFileCatalog extends PartitioningAwareFileCatalog
ListingFileCatalog extends PartitioningAwareFileCatalog
PartitioningAwareFileCatalog extends FileCatalog
TableFileCatalog extends FileCatalog
```

(note that this is a re-submission of https://github.com/apache/spark/pull/15518 which got reverted)

## How was this patch tested?

Existing tests

Author: Eric Liang <ekl@databricks.com>

Closes #15533 from ericl/fix-scalastyle-revert.
2016-10-18 13:33:46 -07:00
Wenchen Fan e59df62e62 [SPARK-17899][SQL][FOLLOW-UP] debug mode should work for corrupted table
## What changes were proposed in this pull request?

Debug mode should work for corrupted table, so that we can really debug

## How was this patch tested?

new test in `MetastoreDataSourcesSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15528 from cloud-fan/debug.
2016-10-18 11:03:10 -07:00
Reynold Xin 1c5a7d7f64 Revert "[SPARK-17974] Refactor FileCatalog classes to simplify the inheritance tree"
This reverts commit 8daa1a29b6.
2016-10-17 21:26:28 -07:00
Eric Liang 8daa1a29b6 [SPARK-17974] Refactor FileCatalog classes to simplify the inheritance tree
## What changes were proposed in this pull request?

This renames `BasicFileCatalog => FileCatalog`, combines  `SessionFileCatalog` with `PartitioningAwareFileCatalog`, and removes the old `FileCatalog` trait.

In summary,
```
MetadataLogFileCatalog extends PartitioningAwareFileCatalog
ListingFileCatalog extends PartitioningAwareFileCatalog
PartitioningAwareFileCatalog extends FileCatalog
TableFileCatalog extends FileCatalog
```

cc cloud-fan mallman

## How was this patch tested?

Existing tests

Author: Eric Liang <ekl@databricks.com>

Closes #15518 from ericl/refactor-session-file-catalog.
2016-10-17 21:01:22 -07:00
Dilip Biswal 813ab5e025 [SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables
## What changes were proposed in this pull request?
Reopens the closed PR https://github.com/apache/spark/pull/15190
(Please refer to the above link for review comments on the PR)

Make sure the hive.default.fileformat is used to when creating the storage format metadata.

Output
``` SQL
scala> spark.sql("SET hive.default.fileformat=orc")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("CREATE TABLE tmp_default(id INT)")
res2: org.apache.spark.sql.DataFrame = []
```
Before
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]
```
After
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]

```
## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Added new tests to HiveDDLCommandSuite, SQLQuerySuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #15495 from dilipbiswal/orc2.
2016-10-17 20:46:30 -07:00
Michael Allman 6ce1b675ee [SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query
(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by #14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #14690 from mallman/spark-16980-lazy_partition_fetching.
2016-10-14 18:26:18 -07:00
Yin Huai 522dd0d0e5 Revert "[SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables"
This reverts commit 7ab86244e3.
2016-10-14 14:09:35 -07:00
Dilip Biswal 7ab86244e3 [SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables
## What changes were proposed in this pull request?
Make sure the hive.default.fileformat is used to when creating the storage format metadata.

Output
``` SQL
scala> spark.sql("SET hive.default.fileformat=orc")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("CREATE TABLE tmp_default(id INT)")
res2: org.apache.spark.sql.DataFrame = []
```
Before
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]
```
After
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]

```

## How was this patch tested?
Added new tests to HiveDDLCommandSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #15190 from dilipbiswal/orc.
2016-10-14 13:22:59 -07:00
wangzhenhua 7486442fe0 [SPARK-17073][SQL][FOLLOWUP] generate column-level statistics
## What changes were proposed in this pull request?
This pr adds some test cases for statistics: case sensitive column names, non ascii column names, refresh table, and also improves some documentation.

## How was this patch tested?
add test cases

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #15360 from wzhfy/colStats2.
2016-10-14 21:18:49 +08:00
Wenchen Fan 2fb12b0a33 [SPARK-17903][SQL] MetastoreRelation should talk to external catalog instead of hive client
## What changes were proposed in this pull request?

`HiveExternalCatalog` should be the only interface to talk to the hive metastore. In `MetastoreRelation` we can just use `ExternalCatalog` instead of `HiveClient` to interact with hive metastore,  and add missing API in `ExternalCatalog`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15460 from cloud-fan/relation.
2016-10-14 15:53:50 +08:00
Pete Robbins 84f149e414 [SPARK-17827][SQL] maxColLength type should be Int for String and Binary
## What changes were proposed in this pull request?
correct the expected type from Length function to be Int

## How was this patch tested?
Test runs on little endian and big endian platforms

Author: Pete Robbins <robbinspg@gmail.com>

Closes #15464 from robbinspg/SPARK-17827.
2016-10-13 11:26:30 -07:00
gatorsmile 0a8e51a5e4 [SPARK-17657][SQL] Disallow Users to Change Table Type
### What changes were proposed in this pull request?
Hive allows users to change the table type from `Managed` to `External` or from `External` to `Managed` by altering table's property `EXTERNAL`. See the JIRA: https://issues.apache.org/jira/browse/HIVE-1329

So far, Spark SQL does not correctly support it, although users can do it. Many assumptions are broken in the implementation. Thus, this PR is to disallow users to change it.

In addition, we also do not allow users to set the property `EXTERNAL` when creating a table.

### How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15230 from gatorsmile/alterTableSetExternal.
2016-10-13 21:36:39 +08:00
Wenchen Fan db8784feaa [SPARK-17899][SQL] add a debug mode to keep raw table properties in HiveExternalCatalog
## What changes were proposed in this pull request?

Currently `HiveExternalCatalog` will filter out the Spark SQL internal table properties, e.g. `spark.sql.sources.provider`, `spark.sql.sources.schema`, etc. This is reasonable for external users as they don't want to see these internal properties in `DESC TABLE`.

However, as a Spark developer, sometimes we do wanna see the raw table properties. This PR adds a new internal SQL conf, `spark.sql.debug`, to enable debug mode and keep these raw table properties.

This config can also be used in similar places where we wanna retain debug information in the future.

## How was this patch tested?

new test in MetastoreDataSourcesSuite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15458 from cloud-fan/debug.
2016-10-13 03:26:29 -04:00