Commit graph

4468 commits

Author SHA1 Message Date
Michael Allman 6ce1b675ee [SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query
(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by #14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #14690 from mallman/spark-16980-lazy_partition_fetching.
2016-10-14 18:26:18 -07:00
Srinath Shankar 2d96d35dc0 [SPARK-17946][PYSPARK] Python crossJoin API similar to Scala
## What changes were proposed in this pull request?

Add a crossJoin function to the DataFrame API similar to that in Scala. Joins with no condition (cartesian products) must be specified with the crossJoin API

## How was this patch tested?
Added python tests to ensure that an AnalysisException if a cartesian product is specified without crossJoin(), and that cartesian products can execute if specified via crossJoin()

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

Author: Srinath Shankar <srinath@databricks.com>

Closes #15493 from srinathshankar/crosspython.
2016-10-14 18:24:47 -07:00
Reynold Xin 72adfbf94a [SPARK-17900][SQL] Graduate a list of Spark SQL APIs to stable
## What changes were proposed in this pull request?
This patch graduates a list of Spark SQL APIs and mark them stable.

The following are marked stable:

Dataset/DataFrame
- functions, since 1.3
- ColumnName, since 1.3
- DataFrameNaFunctions, since 1.3.1
- DataFrameStatFunctions, since 1.4
- UserDefinedFunction, since 1.3
- UserDefinedAggregateFunction, since 1.5
- Window and WindowSpec, since 1.4

Data sources:
- DataSourceRegister, since 1.5
- RelationProvider, since 1.3
- SchemaRelationProvider, since 1.3
- CreatableRelationProvider, since 1.3
- BaseRelation, since 1.3
- TableScan, since 1.3
- PrunedScan, since 1.3
- PrunedFilteredScan, since 1.3
- InsertableRelation, since 1.3

The following are kept experimental / evolving:

Data sources:
- CatalystScan (tied to internal logical plans so it is not stable by definition)

Structured streaming:
- all classes (introduced new in 2.0 and will likely change)

Dataset typed operations (introduced in 1.6 and 2.0 and might change, although probability is low)
- all typed methods on Dataset
- KeyValueGroupedDataset
- o.a.s.sql.expressions.javalang.typed
- o.a.s.sql.expressions.scalalang.typed
- methods that return typed Dataset in SparkSession

We should discuss more whether we want to mark Dataset typed operations stable in 2.1.

## How was this patch tested?
N/A - just annotation changes.

Author: Reynold Xin <rxin@databricks.com>

Closes #15469 from rxin/SPARK-17900.
2016-10-14 16:13:42 -07:00
Jeff Zhang f00df40cfe [SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits:
* Leverage the power of rich third party java library
* Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance.

Author: Jeff Zhang <zjffdu@apache.org>

Closes #9766 from zjffdu/SPARK-11775.
2016-10-14 15:50:35 -07:00
Nick Pentreath 5aeb7384c7 [SPARK-16063][SQL] Add storageLevel to Dataset
[SPARK-11905](https://issues.apache.org/jira/browse/SPARK-11905) added support for `persist`/`cache` for `Dataset`. However, there is no user-facing API to check if a `Dataset` is cached and if so what the storage level is. This PR adds `getStorageLevel` to `Dataset`, analogous to `RDD.getStorageLevel`.

Updated `DatasetCacheSuite`.

Author: Nick Pentreath <nickp@za.ibm.com>

Closes #13780 from MLnick/ds-storagelevel.

Signed-off-by: Michael Armbrust <michael@databricks.com>
2016-10-14 15:09:49 -07:00
Davies Liu da9aeb0fde [SPARK-17863][SQL] should not add column into Distinct
## What changes were proposed in this pull request?

We are trying to resolve the attribute in sort by pulling up some column for grandchild into child, but that's wrong when the child is Distinct, because the added column will change the behavior of Distinct, we should not do that.

## How was this patch tested?

Added regression test.

Author: Davies Liu <davies@databricks.com>

Closes #15489 from davies/order_distinct.
2016-10-14 14:45:20 -07:00
Yin Huai 522dd0d0e5 Revert "[SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables"
This reverts commit 7ab86244e3.
2016-10-14 14:09:35 -07:00
Dilip Biswal 7ab86244e3 [SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables
## What changes were proposed in this pull request?
Make sure the hive.default.fileformat is used to when creating the storage format metadata.

Output
``` SQL
scala> spark.sql("SET hive.default.fileformat=orc")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("CREATE TABLE tmp_default(id INT)")
res2: org.apache.spark.sql.DataFrame = []
```
Before
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]
```
After
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]

```

## How was this patch tested?
Added new tests to HiveDDLCommandSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #15190 from dilipbiswal/orc.
2016-10-14 13:22:59 -07:00
Tathagata Das 05800b4b4e [TEST] Ignore flaky test in StreamingQueryListenerSuite
## What changes were proposed in this pull request?

Ignoring the flaky test introduced in #15307

https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/1736/testReport/junit/org.apache.spark.sql.streaming/StreamingQueryListenerSuite/single_listener__check_trigger_statuses/

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15491 from tdas/metrics-flaky-test.
2016-10-14 12:39:25 -07:00
Andrew Ash fa37877af0
Typo: form -> from
## What changes were proposed in this pull request?

Minor typo fix

## How was this patch tested?

Existing unit tests on Jenkins

Author: Andrew Ash <andrew@andrewash.com>

Closes #15486 from ash211/patch-8.
2016-10-14 18:13:19 +01:00
wangzhenhua 7486442fe0 [SPARK-17073][SQL][FOLLOWUP] generate column-level statistics
## What changes were proposed in this pull request?
This pr adds some test cases for statistics: case sensitive column names, non ascii column names, refresh table, and also improves some documentation.

## How was this patch tested?
add test cases

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #15360 from wzhfy/colStats2.
2016-10-14 21:18:49 +08:00
Wenchen Fan 2fb12b0a33 [SPARK-17903][SQL] MetastoreRelation should talk to external catalog instead of hive client
## What changes were proposed in this pull request?

`HiveExternalCatalog` should be the only interface to talk to the hive metastore. In `MetastoreRelation` we can just use `ExternalCatalog` instead of `HiveClient` to interact with hive metastore,  and add missing API in `ExternalCatalog`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15460 from cloud-fan/relation.
2016-10-14 15:53:50 +08:00
Reynold Xin 6c29b3de76 [SPARK-17925][SQL] Break fileSourceInterfaces.scala into multiple pieces
## What changes were proposed in this pull request?
This patch does a few changes to the file structure of data sources:

- Break fileSourceInterfaces.scala into multiple pieces (HadoopFsRelation, FileFormat, OutputWriter)
- Move ParquetOutputWriter into its own file

I created this as a separate patch so it'd be easier to review my future PRs that focus on refactoring this internal logic. This patch only moves code around, and has no logic changes.

## How was this patch tested?
N/A - should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #15473 from rxin/SPARK-17925.
2016-10-14 14:14:52 +08:00
Reynold Xin 8543996c3f [SPARK-17927][SQL] Remove dead code in WriterContainer.
## What changes were proposed in this pull request?
speculationEnabled and DATASOURCE_OUTPUTPATH seem like just dead code.

## How was this patch tested?
Tests should fail if they are not dead code.

Author: Reynold Xin <rxin@databricks.com>

Closes #15477 from rxin/SPARK-17927.
2016-10-14 12:35:59 +08:00
Jakob Odersky 9dc0ca060d [SPARK-17368][SQL] Add support for value class serialization and deserialization
## What changes were proposed in this pull request?
Value classes were unsupported because catalyst data types were
obtained through reflection on erased types, which would resolve to a
value class' wrapped type and hence lead to unavailable methods during
code generation.

E.g. the following class
```scala
case class Foo(x: Int) extends AnyVal
```
would be seen as an `int` in catalyst and will cause instance cast failures when generated java code tries to treat it as a `Foo`.

This patch simply removes the erasure step when getting data types for
catalyst.

## How was this patch tested?
Additional tests in `ExpressionEncoderSuite`.

Author: Jakob Odersky <jakob@odersky.com>

Closes #15284 from jodersky/value-classes.
2016-10-13 17:48:09 -07:00
petermaxlee adc112429d [SPARK-17661][SQL] Consolidate various listLeafFiles implementations
## What changes were proposed in this pull request?
There are 4 listLeafFiles-related functions in Spark:

- ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented)
- HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel)
- HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles)

It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one.

This code can be improved by:

- Move all file listing code into ListingFileCatalog, since it is the only class that needs this.
- Keep only one function for listing files in serial.

## How was this patch tested?
This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #15235 from petermaxlee/SPARK-17661.
2016-10-13 14:16:39 -07:00
Tathagata Das 7106866c22 [SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

### New APIs
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all the sources
  - processingRate - Current rate (rows/sec) at which the query is processing data from
                                  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger
    - latencies - getOffset, getBatch, full trigger, wal writes
    - timestamps - trigger start, finish, after getOffset, after getBatch
    - numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the source
  - processingRate - Current rate (rows/sec) at which the query is processing data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger

- Python API for `StreamingQuery.status()`

### Breaking changes to existing APIs
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

## How was this patch tested?

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15307 from tdas/SPARK-17731.
2016-10-13 13:36:26 -07:00
Pete Robbins 84f149e414 [SPARK-17827][SQL] maxColLength type should be Int for String and Binary
## What changes were proposed in this pull request?
correct the expected type from Length function to be Int

## How was this patch tested?
Test runs on little endian and big endian platforms

Author: Pete Robbins <robbinspg@gmail.com>

Closes #15464 from robbinspg/SPARK-17827.
2016-10-13 11:26:30 -07:00
Reynold Xin 04d417a7ca [SPARK-17830][SQL] Annotate remaining SQL APIs with InterfaceStability
## What changes were proposed in this pull request?
This patch annotates all the remaining APIs in SQL (excluding streaming) with InterfaceStability.

## How was this patch tested?
N/A - just annotation change.

Author: Reynold Xin <rxin@databricks.com>

Closes #15457 from rxin/SPARK-17830-2.
2016-10-13 11:12:30 -07:00
gatorsmile 0a8e51a5e4 [SPARK-17657][SQL] Disallow Users to Change Table Type
### What changes were proposed in this pull request?
Hive allows users to change the table type from `Managed` to `External` or from `External` to `Managed` by altering table's property `EXTERNAL`. See the JIRA: https://issues.apache.org/jira/browse/HIVE-1329

So far, Spark SQL does not correctly support it, although users can do it. Many assumptions are broken in the implementation. Thus, this PR is to disallow users to change it.

In addition, we also do not allow users to set the property `EXTERNAL` when creating a table.

### How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15230 from gatorsmile/alterTableSetExternal.
2016-10-13 21:36:39 +08:00
Wenchen Fan db8784feaa [SPARK-17899][SQL] add a debug mode to keep raw table properties in HiveExternalCatalog
## What changes were proposed in this pull request?

Currently `HiveExternalCatalog` will filter out the Spark SQL internal table properties, e.g. `spark.sql.sources.provider`, `spark.sql.sources.schema`, etc. This is reasonable for external users as they don't want to see these internal properties in `DESC TABLE`.

However, as a Spark developer, sometimes we do wanna see the raw table properties. This PR adds a new internal SQL conf, `spark.sql.debug`, to enable debug mode and keep these raw table properties.

This config can also be used in similar places where we wanna retain debug information in the future.

## How was this patch tested?

new test in MetastoreDataSourcesSuite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15458 from cloud-fan/debug.
2016-10-13 03:26:29 -04:00
buzhihuojie 7222a25a11 minor doc fix for Row.scala
## What changes were proposed in this pull request?

minor doc fix for "getAnyValAs" in class Row

## How was this patch tested?

None.

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: buzhihuojie <ren.weiluo@gmail.com>

Closes #15452 from david-weiluo-ren/minorDocFixForRow.
2016-10-12 22:51:54 -07:00
Liang-Chi Hsieh 064d6650e9 [SPARK-17866][SPARK-17867][SQL] Fix Dataset.dropduplicates
## What changes were proposed in this pull request?

Two issues regarding Dataset.dropduplicates:

1. Dataset.dropDuplicates should consider the columns with same column name

    We find and get the first resolved attribute from output with the given column name in `Dataset.dropDuplicates`. When we have the more than one columns with the same name. Other columns are put into aggregation columns, instead of grouping columns.

2. Dataset.dropDuplicates should not change the output of child plan

    We create new `Alias` with new exprId in `Dataset.dropDuplicates` now. However it causes problem when we want to select the columns as follows:

        val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
        // ds("_2") will cause analysis exception
        ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int])

Because the two issues are both related to `Dataset.dropduplicates` and the code changes are not big, so submitting them together as one PR.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15427 from viirya/fix-dropduplicates.
2016-10-13 13:27:57 +08:00
Burak Yavuz edeb51a39d [SPARK-17876] Write StructuredStreaming WAL to a stream instead of materializing all at once
## What changes were proposed in this pull request?

The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:
```
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)

```
The safer way is to write to an output stream so that we don't have to materialize a huge string.

## How was this patch tested?

Existing unit tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15437 from brkyvz/ser-to-stream.
2016-10-12 21:40:45 -07:00
Reynold Xin 6f20a92ca3 [SPARK-17845] [SQL] More self-evident window function frame boundary API
## What changes were proposed in this pull request?
This patch improves the window function frame boundary API to make it more obvious to read and to use. The two high level changes are:

1. Create Window.currentRow, Window.unboundedPreceding, Window.unboundedFollowing to indicate the special values in frame boundaries. These methods map to the special integral values so we are not breaking backward compatibility here. This change makes the frame boundaries more self-evident (instead of Long.MinValue, it becomes Window.unboundedPreceding).

2. In Python, for any value less than or equal to JVM's Long.MinValue, treat it as Window.unboundedPreceding. For any value larger than or equal to JVM's Long.MaxValue, treat it as Window.unboundedFollowing. Before this change, if the user specifies any value that is less than Long.MinValue but not -sys.maxsize (e.g. -sys.maxsize + 1), the number we pass over to the JVM would overflow, resulting in a frame that does not make sense.

Code example required to specify a frame before this patch:
```
Window.rowsBetween(-Long.MinValue, 0)
```

While the above code should still work, the new way is more obvious to read:
```
Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
```

## How was this patch tested?
- Updated DataFrameWindowSuite (for Scala/Java)
- Updated test_window_functions_cumulative_sum (for Python)
- Renamed DataFrameWindowSuite DataFrameWindowFunctionsSuite to better reflect its purpose

Author: Reynold Xin <rxin@databricks.com>

Closes #15438 from rxin/SPARK-17845.
2016-10-12 16:45:10 -07:00
Imran Rashid 9ce7d3e542 [SPARK-17675][CORE] Expand Blacklist for TaskSets
## What changes were proposed in this pull request?

This is a step along the way to SPARK-8425.

To enable incremental review, the first step proposed here is to expand the blacklisting within tasksets. In particular, this will enable blacklisting for
* (task, executor) pairs (this already exists via an undocumented config)
* (task, node)
* (taskset, executor)
* (taskset, node)

Adding (task, node) is critical to making spark fault-tolerant of one-bad disk in a cluster, without requiring careful tuning of "spark.task.maxFailures". The other additions are also important to avoid many misleading task failures and long scheduling delays when there is one bad node on a large cluster.

Note that some of the code changes here aren't really required for just this -- they put pieces in place for SPARK-8425 even though they are not used yet (eg. the `BlacklistTracker` helper is a little out of place, `TaskSetBlacklist` holds onto a little more info than it needs to for just this change, and `ExecutorFailuresInTaskSet` is more complex than it needs to be).

## How was this patch tested?

Added unit tests, run tests via jenkins.

Author: Imran Rashid <irashid@cloudera.com>
Author: mwws <wei.mao@intel.com>

Closes #15249 from squito/taskset_blacklist_only.
2016-10-12 16:43:03 -05:00
Shixiong Zhu 47776e7c0c [SPARK-17850][CORE] Add a flag to ignore corrupt files
## What changes were proposed in this pull request?

Add a flag to ignore corrupt files. For Spark core, the configuration is `spark.files.ignoreCorruptFiles`. For Spark SQL, it's `spark.sql.files.ignoreCorruptFiles`.

## How was this patch tested?

The added unit tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15422 from zsxwing/SPARK-17850.
2016-10-12 13:51:53 -07:00
prigarg d5580ebaa0 [SPARK-17884][SQL] To resolve Null pointer exception when casting from empty string to interval type.
## What changes were proposed in this pull request?
This change adds a check in castToInterval method of Cast expression , such that if converted value is null , then isNull variable should be set to true.

Earlier, the expression Cast(Literal(), CalendarIntervalType) was throwing NullPointerException because of the above mentioned reason.

## How was this patch tested?
Added test case in CastSuite.scala

jira entry for detail: https://issues.apache.org/jira/browse/SPARK-17884

Author: prigarg <prigarg@adobe.com>

Closes #15449 from priyankagargnitk/SPARK-17884.
2016-10-12 10:14:45 -07:00
Wenchen Fan b9a147181d [SPARK-17720][SQL] introduce static SQL conf
## What changes were proposed in this pull request?

SQLConf is session-scoped and mutable. However, we do have the requirement for a static SQL conf, which is global and immutable, e.g. the `schemaStringThreshold` in `HiveExternalCatalog`, the flag to enable/disable hive support, the global temp view database in https://github.com/apache/spark/pull/14897.

Actually we've already implemented static SQL conf implicitly via `SparkConf`, this PR just make it explicit and expose it to users, so that they can see the config value via SQL command or `SparkSession.conf`, and forbid users to set/unset static SQL conf.

## How was this patch tested?

new tests in SQLConfSuite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15295 from cloud-fan/global-conf.
2016-10-11 20:27:08 -07:00
Liang-Chi Hsieh c8c090640a [SPARK-17821][SQL] Support And and Or in Expression Canonicalize
## What changes were proposed in this pull request?

Currently `Canonicalize` object doesn't support `And` and `Or`. So we can compare canonicalized form of predicates consistently. We should add the support.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15388 from viirya/canonicalize-and-or.
2016-10-11 16:06:40 +08:00
Reynold Xin 3694ba48f0 [SPARK-17864][SQL] Mark data type APIs as stable (not DeveloperApi)
## What changes were proposed in this pull request?
The data type API has not been changed since Spark 1.3.0, and is ready for graduation. This patch marks them as stable APIs using the new InterfaceStability annotation.

This patch also looks at the various files in the catalyst module (not the "package") and marks the remaining few classes appropriately as well.

## How was this patch tested?
This is an annotation change. No functional changes.

Author: Reynold Xin <rxin@databricks.com>

Closes #15426 from rxin/SPARK-17864.
2016-10-11 15:35:52 +08:00
Wenchen Fan 7388ad94d7 [SPARK-17338][SQL][FOLLOW-UP] add global temp view
## What changes were proposed in this pull request?

address post hoc review comments for https://github.com/apache/spark/pull/14897

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15424 from cloud-fan/global-temp-view.
2016-10-11 15:21:28 +08:00
Reynold Xin b515768f26 [SPARK-17844] Simplify DataFrame API for defining frame boundaries in window functions
## What changes were proposed in this pull request?
When I was creating the example code for SPARK-10496, I realized it was pretty convoluted to define the frame boundaries for window functions when there is no partition column or ordering column. The reason is that we don't provide a way to create a WindowSpec directly with the frame boundaries. We can trivially improve this by adding rowsBetween and rangeBetween to Window object.

As an example, to compute cumulative sum using the natural ordering, before this pr:
```
df.select('key, sum("value").over(Window.partitionBy(lit(1)).rowsBetween(Long.MinValue, 0)))
```

After this pr:
```
df.select('key, sum("value").over(Window.rowsBetween(Long.MinValue, 0)))
```

Note that you could argue there is no point specifying a window frame without partitionBy/orderBy -- but it is strange that only rowsBetween and rangeBetween are not the only two APIs not available.

This also fixes https://issues.apache.org/jira/browse/SPARK-17656 (removing _root_.scala).

## How was this patch tested?
Added test cases to compute cumulative sum in DataFrameWindowSuite for Scala/Java and tests.py for Python.

Author: Reynold Xin <rxin@databricks.com>

Closes #15412 from rxin/SPARK-17844.
2016-10-10 22:33:20 -07:00
hyukjinkwon 0c0ad436ad [SPARK-17719][SPARK-17776][SQL] Unify and tie up options in a single place in JDBC datasource package
## What changes were proposed in this pull request?

This PR proposes to fix arbitrary usages among `Map[String, String]`, `Properties` and `JDBCOptions` instances for options in `execution/jdbc` package and make the connection properties exclude Spark-only options.

This PR includes some changes as below:

  - Unify `Map[String, String]`, `Properties` and `JDBCOptions` in `execution/jdbc` package to `JDBCOptions`.

- Move `batchsize`, `fetchszie`, `driver` and `isolationlevel` options into `JDBCOptions` instance.

- Document `batchSize` and `isolationlevel` with marking both read-only options and write-only options. Also, this includes minor types and detailed explanation for some statements such as url.

- Throw exceptions fast by checking arguments first rather than in execution time (e.g. for `fetchsize`).

- Exclude Spark-only options in connection properties.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15292 from HyukjinKwon/SPARK-17719.
2016-10-10 22:22:41 -07:00
hyukjinkwon 90217f9dee [SPARK-16896][SQL] Handle duplicated field names in header consistently with null or empty strings in CSV
## What changes were proposed in this pull request?

Currently, CSV datasource allows to load duplicated empty string fields or fields having `nullValue` in the header. It'd be great if this can deal with normal fields as well.

This PR proposes handling the duplicates consistently with the existing behaviour with considering case-sensitivity (`spark.sql.caseSensitive`) as below:

data below:

```
fieldA,fieldB,,FIELDA,fielda,,
1,2,3,4,5,6,7
```

is parsed as below:

```scala
spark.read.format("csv").option("header", "true").load("test.csv").show()
```

- when `spark.sql.caseSensitive` is `false` (by default).

  ```
  +-------+------+---+-------+-------+---+---+
  |fieldA0|fieldB|_c2|FIELDA3|fieldA4|_c5|_c6|
  +-------+------+---+-------+-------+---+---+
  |      1|     2|  3|      4|      5|  6|  7|
  +-------+------+---+-------+-------+---+---+
  ```

- when `spark.sql.caseSensitive` is `true`.

  ```
  +-------+------+---+-------+-------+---+---+
  |fieldA0|fieldB|_c2| FIELDA|fieldA4|_c5|_c6|
  +-------+------+---+-------+-------+---+---+
  |      1|     2|  3|      4|      5|  6|  7|
  +-------+------+---+-------+-------+---+---+
  ```

**In more details**,

There is a good reference about this problem, `read.csv()` in R. So, I initially wanted to propose the similar behaviour.

In case of R,  the CSV data below:

```
fieldA,fieldB,,fieldA,fieldA,,
1,2,3,4,5,6,7
```

is parsed as below:

```r
test <- read.csv(file="test.csv",header=TRUE,sep=",")
> test
  fieldA fieldB X fieldA.1 fieldA.2 X.1 X.2
1      1      2 3        4        5   6   7
```

However, Spark CSV datasource already is handling duplicated empty strings and `nullValue` as field names. So the data below:

```
,,,fieldA,,fieldB,
1,2,3,4,5,6,7
```

is parsed as below:

```scala
spark.read.format("csv").option("header", "true").load("test.csv").show()
```
```
+---+---+---+------+---+------+---+
|_c0|_c1|_c2|fieldA|_c4|fieldB|_c6|
+---+---+---+------+---+------+---+
|  1|  2|  3|     4|  5|     6|  7|
+---+---+---+------+---+------+---+
```

R starts the number for each duplicate but Spark adds the number for its position for all fields for `nullValue` and empty strings.

In terms of case-sensitivity, it seems R is case-sensitive as below: (it seems it is not configurable).

```
a,a,a,A,A
1,2,3,4,5
```

is parsed as below:

```r
test <- read.csv(file="test.csv",header=TRUE,sep=",")
> test
  a a.1 a.2 A A.1
1 1   2   3 4   5
```

## How was this patch tested?

Unit test in `CSVSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14745 from HyukjinKwon/SPARK-16896.
2016-10-11 10:21:22 +08:00
Davies Liu d5ec4a3e01 [SPARK-17738][TEST] Fix flaky test in ColumnTypeSuite
## What changes were proposed in this pull request?

The default buffer size is not big enough for randomly generated MapType.

## How was this patch tested?

Ran the tests in 100 times, it never fail (it fail 8 times before the patch).

Author: Davies Liu <davies@databricks.com>

Closes #15395 from davies/flaky_map.
2016-10-10 19:14:01 -07:00
Reynold Xin 689de92005 [SPARK-17830] Annotate spark.sql package with InterfaceStability
## What changes were proposed in this pull request?
This patch annotates the InterfaceStability level for top level classes in o.a.spark.sql and o.a.spark.sql.util packages, to experiment with this new annotation.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #15392 from rxin/SPARK-17830.
2016-10-10 11:29:09 -07:00
jiangxingbo 7e16c94f18
[HOT-FIX][SQL][TESTS] Remove unused function in SparkSqlParserSuite
## What changes were proposed in this pull request?

The function `SparkSqlParserSuite.createTempViewUsing` is not used for now and causes build failure, this PR simply removes it.

## How was this patch tested?
N/A

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15418 from jiangxb1987/parserSuite.
2016-10-10 13:49:25 +01:00
Wenchen Fan 23ddff4b2b [SPARK-17338][SQL] add global temp view
## What changes were proposed in this pull request?

Global temporary view is a cross-session temporary view, which means it's shared among all sessions. Its lifetime is the lifetime of the Spark application, i.e. it will be automatically dropped when the application terminates. It's tied to a system preserved database `global_temp`(configurable via SparkConf), and we must use the qualified name to refer a global temp view, e.g. SELECT * FROM global_temp.view1.

changes for `SessionCatalog`:

1. add a new field `gloabalTempViews: GlobalTempViewManager`, to access the shared global temp views, and the global temp db name.
2. `createDatabase` will fail if users wanna create `global_temp`, which is system preserved.
3. `setCurrentDatabase` will fail if users wanna set `global_temp`, which is system preserved.
4. add `createGlobalTempView`, which is used in `CreateViewCommand` to create global temp views.
5. add `dropGlobalTempView`, which is used in `CatalogImpl` to drop global temp view.
6. add `alterTempViewDefinition`, which is used in `AlterViewAsCommand` to update the view definition for local/global temp views.
7. `renameTable`/`dropTable`/`isTemporaryTable`/`lookupRelation`/`getTempViewOrPermanentTableMetadata`/`refreshTable` will handle global temp views.

changes for SQL commands:

1. `CreateViewCommand`/`AlterViewAsCommand` is updated to support global temp views
2. `ShowTablesCommand` outputs a new column `database`, which is used to distinguish global and local temp views.
3. other commands can also handle global temp views if they call `SessionCatalog` APIs which accepts global temp views, e.g. `DropTableCommand`, `AlterTableRenameCommand`, `ShowColumnsCommand`, etc.

changes for other public API

1. add a new method `dropGlobalTempView` in `Catalog`
2. `Catalog.findTable` can find global temp view
3. add a new method `createGlobalTempView` in `Dataset`

## How was this patch tested?

new tests in `SQLViewSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14897 from cloud-fan/global-temp-view.
2016-10-10 15:48:57 +08:00
jiangxingbo 16590030c1 [SPARK-17741][SQL] Grammar to parse top level and nested data fields separately
## What changes were proposed in this pull request?

Currently we use the same rule to parse top level and nested data fields. For example:
```
create table tbl_x(
  id bigint,
  nested struct<col1:string,col2:string>
)
```
Shows both syntaxes. In this PR we split this rule in a top-level and nested rule.

Before this PR,
```
sql("CREATE TABLE my_tab(column1: INT)")
```
works fine.
After this PR, it will throw a `ParseException`:
```
scala> sql("CREATE TABLE my_tab(column1: INT)")
org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'CREATE TABLE my_tab(column1:'(line 1, pos 27)
```

## How was this patch tested?
Add new testcases in `SparkSqlParserSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15346 from jiangxb1987/cdt.
2016-10-09 22:00:54 -07:00
jiangxingbo 26fbca4806 [SPARK-17832][SQL] TableIdentifier.quotedString creates un-parseable names when name contains a backtick
## What changes were proposed in this pull request?

The `quotedString` method in `TableIdentifier` and `FunctionIdentifier` produce an illegal (un-parseable) name when the name contains a backtick. For example:
```
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
val complexName = TableIdentifier("`weird`table`name", Some("`d`b`1"))
parseTableIdentifier(complexName.unquotedString) // Does not work
parseTableIdentifier(complexName.quotedString) // Does not work
parseExpression(complexName.unquotedString) // Does not work
parseExpression(complexName.quotedString) // Does not work
```
We should handle the backtick properly to make `quotedString` parseable.

## How was this patch tested?
Add new testcases in `TableIdentifierParserSuite` and `ExpressionParserSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15403 from jiangxb1987/backtick.
2016-10-09 21:52:46 -07:00
Weiqing Yang 8a6bbe095b
[MINOR][SQL] Use resource path for test_script.sh
## What changes were proposed in this pull request?
This PR modified the test case `test("script")` to use resource path for `test_script.sh`. Make the test case portable (even in IntelliJ).

## How was this patch tested?
Passed the test case.
Before:
Run `test("script")` in IntelliJ:
```
Caused by: org.apache.spark.SparkException: Subprocess exited with status 127. Error: bash: src/test/resources/test_script.sh: No such file or directory
```
After:
Test passed.

Author: Weiqing Yang <yangweiqing001@gmail.com>

Closes #15246 from weiqingy/hivetest.
2016-10-08 12:12:35 +01:00
hyukjinkwon 24850c9415 [HOTFIX][BUILD] Do not use contains in Option in JdbcRelationProvider
## What changes were proposed in this pull request?

This PR proposes the fix the use of `contains` API which only exists from Scala 2.11.

## How was this patch tested?

Manually checked:

```scala
scala> val o: Option[Boolean] = None
o: Option[Boolean] = None

scala> o == Some(false)
res17: Boolean = false

scala> val o: Option[Boolean] = Some(true)
o: Option[Boolean] = Some(true)

scala> o == Some(false)
res18: Boolean = false

scala> val o: Option[Boolean] = Some(false)
o: Option[Boolean] = Some(false)

scala> o == Some(false)
res19: Boolean = true
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15393 from HyukjinKwon/hotfix.
2016-10-07 17:59:24 -07:00
Davies Liu 94b24b84a6 [SPARK-17806] [SQL] fix bug in join key rewritten in HashJoin
## What changes were proposed in this pull request?

In HashJoin, we try to rewrite the join key as Long to improve the performance of finding a match. The rewriting part is not well tested, has a bug that could cause wrong result when there are at least three integral columns in the joining key also the total length of the key exceed 8 bytes.

## How was this patch tested?

Added unit test to covering the rewriting with different number of columns and different data types. Manually test the reported case and confirmed that this PR fix the bug.

Author: Davies Liu <davies@databricks.com>

Closes #15390 from davies/rewrite_key.
2016-10-07 15:03:47 -07:00
Herman van Hovell 97594c29b7 [SPARK-17761][SQL] Remove MutableRow
## What changes were proposed in this pull request?
In practice we cannot guarantee that an `InternalRow` is immutable. This makes the `MutableRow` almost redundant. This PR folds `MutableRow` into `InternalRow`.

The code below illustrates the immutability issue with InternalRow:
```scala
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
val struct = new GenericMutableRow(1)
val row = InternalRow(struct, 1)
println(row)
scala> [[null], 1]
struct.setInt(0, 42)
println(row)
scala> [[42], 1]
```

This might be somewhat controversial, so feedback is appreciated.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15333 from hvanhovell/SPARK-17761.
2016-10-07 14:03:45 -07:00
Davies Liu 2badb58cdd [SPARK-15621][SQL] Support spilling for Python UDF
## What changes were proposed in this pull request?

When execute a Python UDF, we buffer the input row into as queue, then pull them out to join with the result from Python UDF. In the case that Python UDF is slow or the input row is too wide, we could ran out of memory because of the queue. Since we can't flush all the buffers (sockets) between JVM and Python process from JVM side, we can't limit the rows in the queue, otherwise it could deadlock.

This PR will manage the memory used by the queue, spill that into disk when there is no enough memory (also release the memory and disk space as soon as possible).

## How was this patch tested?

Added unit tests. Also manually ran a workload with large input row and slow python UDF (with  large broadcast) like this:

```
b = range(1<<24)
add = udf(lambda x: x + len(b), IntegerType())
df = sqlContext.range(1, 1<<26, 1, 4)
print df.select(df.id, lit("adf"*10000).alias("s"), add(df.id).alias("add")).groupBy(length("s")).sum().collect()
```

It ran out of memory (hang because of full GC) before the patch, ran smoothly after the patch.

Author: Davies Liu <davies@databricks.com>

Closes #15089 from davies/spill_udf.
2016-10-07 13:45:00 -07:00
Prashant Sharma bb1aaf28ec [SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming.
## What changes were proposed in this pull request?

Adds the textFile API which exists in DataFrameReader and serves same purpose.

## How was this patch tested?

Added corresponding testcase.

Author: Prashant Sharma <prashsh1@in.ibm.com>

Closes #14087 from ScrapCodes/textFile.
2016-10-07 11:16:24 -07:00
hyukjinkwon aa3a6841eb [SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider
## What changes were proposed in this pull request?

This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940

Also, this PR proposes the changes below:

 - Add documentation for `batchsize` and `isolationLevel`.
 - Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15263 from HyukjinKwon/SPARK-14525.
2016-10-07 10:52:32 -07:00
Sean Owen cff5607552 [SPARK-17707][WEBUI] Web UI prevents spark-submit application to be finished
## What changes were proposed in this pull request?

This expands calls to Jetty's simple `ServerConnector` constructor to explicitly specify a `ScheduledExecutorScheduler` that makes daemon threads. It should otherwise result in exactly the same configuration, because the other args are copied from the constructor that is currently called.

(I'm not sure we should change the Hive Thriftserver impl, but I did anyway.)

This also adds `sc.stop()` to the quick start guide example.

## How was this patch tested?

Existing tests; _pending_ at least manual verification of the fix.

Author: Sean Owen <sowen@cloudera.com>

Closes #15381 from srowen/SPARK-17707.
2016-10-07 10:31:41 -07:00
hyukjinkwon 2b01d3c701
[SPARK-16960][SQL] Deprecate approxCountDistinct, toDegrees and toRadians according to FunctionRegistry
## What changes were proposed in this pull request?

It seems `approxCountDistinct`, `toDegrees` and `toRadians` are also missed while matching the names to the ones in `FunctionRegistry`. (please see [approx_count_distinct](5c2ae79bfc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L244)), [degrees](5c2ae79bfc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L203)) and [radians](5c2ae79bfc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L222)) in `FunctionRegistry`).

I took a scan between `functions.scala` and `FunctionRegistry` and it seems these are all left. For `countDistinct` and `sumDistinct`, they are not registered in `FunctionRegistry`.

This PR deprecates `approxCountDistinct`, `toDegrees` and `toRadians` and introduces `approx_count_distinct`, `degrees` and `radians`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>

Closes #14538 from HyukjinKwon/SPARK-16588-followup.
2016-10-07 11:49:34 +01:00
Shixiong Zhu 9a48e60e63 [SPARK-17780][SQL] Report Throwable to user in StreamExecution
## What changes were proposed in this pull request?

When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.

## How was this patch tested?

`test("NoClassDefFoundError from an incompatible source")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15352 from zsxwing/SPARK-17780.
2016-10-06 12:51:12 -07:00
Reynold Xin 79accf45ac [SPARK-17798][SQL] Remove redundant Experimental annotations in sql.streaming
## What changes were proposed in this pull request?
I was looking through API annotations to catch mislabeled APIs, and realized DataStreamReader and DataStreamWriter classes are already annotated as Experimental, and as a result there is no need to annotate each method within them.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #15373 from rxin/SPARK-17798.
2016-10-06 10:33:45 -07:00
Dongjoon Hyun 92b7e57280 [SPARK-17750][SQL] Fix CREATE VIEW with INTERVAL arithmetic.
## What changes were proposed in this pull request?

Currently, Spark raises `RuntimeException` when creating a view with timestamp with INTERVAL arithmetic like the following. The root cause is the arithmetic expression, `TimeAdd`, was transformed into `timeadd` function as a VIEW definition. This PR fixes the SQL definition of `TimeAdd` and `TimeSub` expressions.

```scala
scala> sql("CREATE TABLE dates (ts TIMESTAMP)")

scala> sql("CREATE VIEW view1 AS SELECT ts + INTERVAL 1 DAY FROM dates")
java.lang.RuntimeException: Failed to analyze the canonicalized SQL: ...
```

## How was this patch tested?

Pass Jenkins with a new testcase.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15318 from dongjoon-hyun/SPARK-17750.
2016-10-06 09:42:30 -07:00
Shixiong Zhu b678e465af [SPARK-17346][SQL][TEST-MAVEN] Generate the sql test jar to fix the maven build
## What changes were proposed in this pull request?

Generate the sql test jar to fix the maven build

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15368 from zsxwing/sql-test-jar.
2016-10-05 18:11:31 -07:00
Shixiong Zhu 9293734d35 [SPARK-17346][SQL] Add Kafka source for Structured Streaming
## What changes were proposed in this pull request?

This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source.

It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing

tdas did most of work and part of them was inspired by koeninger's work.

### Introduction

The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:

Column | Type
---- | ----
key | binary
value | binary
topic | string
partition | int
offset | long
timestamp | long
timestampType | int

The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.

### Configuration

The user can use `DataStreamReader.option` to set the following configurations.

Kafka Source's options | value | default | meaning
------ | ------- | ------ | -----
startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors
fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets.
fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets

Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")`

### Usage

* Subscribe to 1 topic
```Scala
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic1")
  .load()
```

* Subscribe to multiple topics
```Scala
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic1,topic2")
  .load()
```

* Subscribe to a pattern
```Scala
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribePattern", "topic.*")
  .load()
```

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: cody koeninger <cody@koeninger.org>

Closes #15102 from zsxwing/kafka-source.
2016-10-05 16:45:45 -07:00
Herman van Hovell 5fd54b994e [SPARK-17758][SQL] Last returns wrong result in case of empty partition
## What changes were proposed in this pull request?
The result of the `Last` function can be wrong when the last partition processed is empty. It can return `null` instead of the expected value. For example, this can happen when we process partitions in the following order:
```
- Partition 1 [Row1, Row2]
- Partition 2 [Row3]
- Partition 3 []
```
In this case the `Last` function will currently return a null, instead of the value of `Row3`.

This PR fixes this by adding a `valueSet` flag to the `Last` function.

## How was this patch tested?
We only used end to end tests for `DeclarativeAggregateFunction`s. I have added an evaluator for these functions so we can tests them in catalyst. I have added a `LastTestSuite` to test the `Last` aggregate function.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15348 from hvanhovell/SPARK-17758.
2016-10-05 16:05:30 -07:00
Dongjoon Hyun 6a05eb24d0 [SPARK-17328][SQL] Fix NPE with EXPLAIN DESCRIBE TABLE
## What changes were proposed in this pull request?

This PR fixes the following NPE scenario in two ways.

**Reported Error Scenario**
```scala
scala> sql("EXPLAIN DESCRIBE TABLE x").show(truncate = false)
INFO SparkSqlParser: Parsing command: EXPLAIN DESCRIBE TABLE x
java.lang.NullPointerException
```

- **DESCRIBE**: Extend `DESCRIBE` syntax to accept `TABLE`.
- **EXPLAIN**: Prevent NPE in case of the parsing failure of target statement, e.g., `EXPLAIN DESCRIBE TABLES x`.

## How was this patch tested?

Pass the Jenkins test with a new test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15357 from dongjoon-hyun/SPARK-17328.
2016-10-05 10:52:43 -07:00
Herman van Hovell 89516c1c4a [SPARK-17258][SQL] Parse scientific decimal literals as decimals
## What changes were proposed in this pull request?
Currently Spark SQL parses regular decimal literals (e.g. `10.00`) as decimals and scientific decimal literals (e.g. `10.0e10`) as doubles. The difference between the two confuses most users. This PR unifies the parsing behavior and also parses scientific decimal literals as decimals.

This implications in tests are limited to a single Hive compatibility test.

## How was this patch tested?
Updated tests in `ExpressionParserSuite` and `SQLQueryTestSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #14828 from hvanhovell/SPARK-17258.
2016-10-04 23:48:26 -07:00
Tejas Patil a99743d053 [SPARK-17495][SQL] Add Hash capability semantically equivalent to Hive's
## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17495

Spark internally uses Murmur3Hash for partitioning. This is different from the one used by Hive. For queries which use bucketing this leads to different results if one tries the same query on both engines. For us, we want users to have backward compatibility to that one can switch parts of applications across the engines without observing regressions.

This PR includes `HiveHash`, `HiveHashFunction`, `HiveHasher` which mimics Hive's hashing at https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java#L638

I am intentionally not introducing any usages of this hash function in rest of the code to keep this PR small. My eventual goal is to have Hive bucketing support in Spark. Once this PR gets in, I will make hash function pluggable in relevant areas (eg. `HashPartitioning`'s `partitionIdExpression` has Murmur3 hardcoded : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L265)

## How was this patch tested?

Added `HiveHashSuite`

Author: Tejas Patil <tejasp@fb.com>

Closes #15047 from tejasapatil/SPARK-17495_hive_hash.
2016-10-04 18:59:31 -07:00
Marcelo Vanzin 8d969a2125 [SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
This reverts commit 9ac68dbc57. Turns out
the original fix was correct.

Original change description:
The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.

It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.

There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #15304 from vanzin/SPARK-17549.2.
2016-10-04 09:38:44 -07:00
sumansomasundar 7d51608835
[SPARK-16962][CORE][SQL] Fix misaligned record accesses for SPARC architectures
## What changes were proposed in this pull request?

Made changes to record length offsets to make them uniform throughout various areas of Spark core and unsafe

## How was this patch tested?

This change affects only SPARC architectures and was tested on X86 architectures as well for regression.

Author: sumansomasundar <suman.somasundar@oracle.com>

Closes #14762 from sumansomasundar/master.
2016-10-04 10:31:56 +01:00
Ergin Seyfe d2dc8c4a16 [SPARK-17773] Input/Output] Add VoidObjectInspector
## What changes were proposed in this pull request?
Added VoidObjectInspector to the list of PrimitiveObjectInspectors

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Executing following query was failing.
select SOME_UDAF*(a.arr)
from (
select Array(null) as arr from dim_one_row
) a

After the fix, I am getting the correct output:
res0: Array[org.apache.spark.sql.Row] = Array([null])

Author: Ergin Seyfe <eseyfe@fb.com>

Closes #15337 from seyfe/add_void_object_inspector.
2016-10-03 23:28:39 -07:00
Takuya UESHIN b1b47274bf [SPARK-17702][SQL] Code generation including too many mutable states exceeds JVM size limit.
## What changes were proposed in this pull request?

Code generation including too many mutable states exceeds JVM size limit to extract values from `references` into fields in the constructor.
We should split the generated extractions in the constructor into smaller functions.

## How was this patch tested?

I added some tests to check if the generated codes for the expressions exceed or not.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #15275 from ueshin/issues/SPARK-17702.
2016-10-03 21:48:58 -07:00
Dongjoon Hyun c571cfb2d0 [SPARK-17112][SQL] "select null" via JDBC triggers IllegalArgumentException in Thriftserver
## What changes were proposed in this pull request?

Currently, Spark Thrift Server raises `IllegalArgumentException` for queries whose column types are `NullType`, e.g., `SELECT null` or `SELECT if(true,null,null)`. This PR fixes that by returning `void` like Hive 1.2.

**Before**
```sql
$ bin/beeline -u jdbc:hive2://localhost:10000 -e "select null"
Connecting to jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.1.0-SNAPSHOT)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Error: java.lang.IllegalArgumentException: Unrecognized type name: null (state=,code=0)
Closing: 0: jdbc:hive2://localhost:10000

$ bin/beeline -u jdbc:hive2://localhost:10000 -e "select if(true,null,null)"
Connecting to jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.1.0-SNAPSHOT)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Error: java.lang.IllegalArgumentException: Unrecognized type name: null (state=,code=0)
Closing: 0: jdbc:hive2://localhost:10000
```

**After**
```sql
$ bin/beeline -u jdbc:hive2://localhost:10000 -e "select null"
Connecting to jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.1.0-SNAPSHOT)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
+-------+--+
| NULL  |
+-------+--+
| NULL  |
+-------+--+
1 row selected (3.242 seconds)
Beeline version 1.2.1.spark2 by Apache Hive
Closing: 0: jdbc:hive2://localhost:10000

$ bin/beeline -u jdbc:hive2://localhost:10000 -e "select if(true,null,null)"
Connecting to jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.1.0-SNAPSHOT)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
+-------------------------+--+
| (IF(true, NULL, NULL))  |
+-------------------------+--+
| NULL                    |
+-------------------------+--+
1 row selected (0.201 seconds)
Beeline version 1.2.1.spark2 by Apache Hive
Closing: 0: jdbc:hive2://localhost:10000
```

## How was this patch tested?

* Pass the Jenkins test with a new testsuite.
* Also, Manually, after starting Spark Thrift Server, run the following command.
```sql
$ bin/beeline -u jdbc:hive2://localhost:10000 -e "select null"
$ bin/beeline -u jdbc:hive2://localhost:10000 -e "select if(true,null,null)"
```

**Hive 1.2**
```sql
hive> create table null_table as select null;
hive> desc null_table;
OK
_c0                     void
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15325 from dongjoon-hyun/SPARK-17112.
2016-10-03 21:28:16 -07:00
Herman van Hovell 2bbecdec20 [SPARK-17753][SQL] Allow a complex expression as the input a value based case statement
## What changes were proposed in this pull request?
We currently only allow relatively simple expressions as the input for a value based case statement. Expressions like `case (a > 1) or (b = 2) when true then 1 when false then 0 end` currently fail. This PR adds support for such expressions.

## How was this patch tested?
Added a test to the ExpressionParserSuite.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15322 from hvanhovell/SPARK-17753.
2016-10-03 19:32:59 -07:00
Zhenhua Wang 7bf9212764 [SPARK-17073][SQL] generate column-level statistics
## What changes were proposed in this pull request?

Generate basic column statistics for all the atomic types:
- numeric types: max, min, num of nulls, ndv (number of distinct values)
- date/timestamp types: they are also represented as numbers internally, so they have the same stats as above.
- string: avg length, max length, num of nulls, ndv
- binary: avg length, max length, num of nulls
- boolean: num of nulls, num of trues, num of falsies

Also support storing and loading these statistics.

One thing to notice:
We support analyzing columns independently, e.g.:
sql1: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key;`
sql2: `ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS value;`
when running sql2 to collect column stats for `value`, we don’t remove stats of columns `key` which are analyzed in sql1 and not in sql2. As a result, **users need to guarantee consistency** between sql1 and sql2. If the table has been changed before sql2, users should re-analyze column `key` when they want to analyze column `value`:
`ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS key, value;`

## How was this patch tested?

add unit tests

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #15090 from wzhfy/colStats.
2016-10-03 10:12:02 -07:00
Tao LI 76dc2d9073 [SPARK-14914][CORE][SQL] Skip/fix some test cases on Windows due to limitation of Windows
## What changes were proposed in this pull request?

This PR proposes to fix/skip some tests failed on Windows. This PR takes over https://github.com/apache/spark/pull/12696.

**Before**

- **SparkSubmitSuite**

  ```
[info] - launch simple application with spark-submit *** FAILED *** (202 milliseconds)
[info]   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specifie

[info] - includes jars passed in through --jars *** FAILED *** (1 second, 625 milliseconds)
[info]   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified
```

- **DiskStoreSuite**

  ```
[info] - reads of memory-mapped and non memory-mapped files are equivalent *** FAILED *** (1 second, 78 milliseconds)
[info]   diskStoreMapped.remove(blockId) was false (DiskStoreSuite.scala:41)
```

**After**

- **SparkSubmitSuite**

  ```
[info] - launch simple application with spark-submit (578 milliseconds)
[info] - includes jars passed in through --jars (1 second, 875 milliseconds)
```

- **DiskStoreSuite**

  ```
[info] DiskStoreSuite:
[info] - reads of memory-mapped and non memory-mapped files are equivalent !!! CANCELED !!! (766 milliseconds
```

For `CreateTableAsSelectSuite` and `FsHistoryProviderSuite`, I could not reproduce as the Java version seems higher than the one that has the bugs about `setReadable(..)` and `setWritable(...)` but as they are bugs reported clearly, it'd be sensible to skip those. We should revert the changes for both back as soon as we drop the support of Java 7.

## How was this patch tested?

Manually tested via AppVeyor.

Closes #12696

Author: Tao LI <tl@microsoft.com>
Author: U-FAREAST\tl <tl@microsoft.com>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15320 from HyukjinKwon/SPARK-14914.
2016-10-02 16:01:02 -07:00
Sital Kedia f8d7fade4b [SPARK-17509][SQL] When wrapping catalyst datatype to Hive data type avoid…
## What changes were proposed in this pull request?

When wrapping catalyst datatypes to Hive data type, wrap function was doing an expensive pattern matching which was consuming around 11% of cpu time. Avoid the pattern matching by returning the wrapper only once and reuse it.

## How was this patch tested?

Tested by running the job on cluster and saw around 8% cpu improvements.

Author: Sital Kedia <skedia@fb.com>

Closes #15064 from sitalkedia/skedia/hive_wrapper.
2016-10-02 15:47:36 -07:00
Herman van Hovell af6ece33d3 [SPARK-17717][SQL] Add Exist/find methods to Catalog [FOLLOW-UP]
## What changes were proposed in this pull request?
We added find and exists methods for Databases, Tables and Functions to the user facing Catalog in PR https://github.com/apache/spark/pull/15301. However, it was brought up that the semantics of the  `find` methods are more in line a `get` method (get an object or else fail). So we rename these in this PR.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15308 from hvanhovell/SPARK-17717-2.
2016-10-01 00:50:16 -07:00
Eric Liang 4bcd9b728b [SPARK-17740] Spark tests should mock / interpose HDFS to ensure that streams are closed
## What changes were proposed in this pull request?

As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open.

This applies to all tests using SharedSQLContext or SharedSparkContext.

## How was this patch tested?

I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting https://github.com/apache/spark/pull/15245 causes many actual test failures due to connection leaks.

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #15306 from ericl/sc-4672.
2016-09-30 23:51:36 -07:00
Dongjoon Hyun aef506e39a [SPARK-17739][SQL] Collapse adjacent similar Window operators
## What changes were proposed in this pull request?

Currently, Spark does not collapse adjacent windows with the same partitioning and sorting. This PR implements `CollapseWindow` optimizer to do the followings.

1. If the partition specs and order specs are the same, collapse into the parent.
2. If the partition specs are the same and one order spec is a prefix of the other, collapse to the more specific one.

For example:
```scala
val df = spark.range(1000).select($"id" % 100 as "grp", $"id", rand() as "col1", rand() as "col2")

// Add summary statistics for all columns
import org.apache.spark.sql.expressions.Window
val cols = Seq("id", "col1", "col2")
val window = Window.partitionBy($"grp").orderBy($"id")
val result = cols.foldLeft(df) { (base, name) =>
  base.withColumn(s"${name}_avg", avg(col(name)).over(window))
      .withColumn(s"${name}_stddev", stddev(col(name)).over(window))
      .withColumn(s"${name}_min", min(col(name)).over(window))
      .withColumn(s"${name}_max", max(col(name)).over(window))
}
```

**Before**
```scala
scala> result.explain
== Physical Plan ==
Window [max(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#234], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [min(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#216], [grp#17L], [id#14L ASC NULLS FIRST]
   +- Window [stddev_samp(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#191], [grp#17L], [id#14L ASC NULLS FIRST]
      +- Window [avg(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#167], [grp#17L], [id#14L ASC NULLS FIRST]
         +- Window [max(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#152], [grp#17L], [id#14L ASC NULLS FIRST]
            +- Window [min(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#138], [grp#17L], [id#14L ASC NULLS FIRST]
               +- Window [stddev_samp(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#117], [grp#17L], [id#14L ASC NULLS FIRST]
                  +- Window [avg(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#97], [grp#17L], [id#14L ASC NULLS FIRST]
                     +- Window [max(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#86L], [grp#17L], [id#14L ASC NULLS FIRST]
                        +- Window [min(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#76L], [grp#17L], [id#14L ASC NULLS FIRST]
                           +- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, id_stddev#42]
                              +- Window [stddev_samp(_w0#59) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#42], [grp#17L], [id#14L ASC NULLS FIRST]
                                 +- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, cast(id#14L as double) AS _w0#59]
                                    +- Window [avg(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#26], [grp#17L], [id#14L ASC NULLS FIRST]
                                       +- *Sort [grp#17L ASC NULLS FIRST, id#14L ASC NULLS FIRST], false, 0
                                          +- Exchange hashpartitioning(grp#17L, 200)
                                             +- *Project [(id#14L % 100) AS grp#17L, id#14L, rand(-6329949029880411066) AS col1#18, rand(-7251358484380073081) AS col2#19]
                                                +- *Range (0, 1000, step=1, splits=Some(8))
```

**After**
```scala
scala> result.explain
== Physical Plan ==
Window [max(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#220, min(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#202, stddev_samp(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#177, avg(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#153, max(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#138, min(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#124, stddev_samp(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#103, avg(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#83, max(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#72L, min(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#62L], [grp#3L], [id#0L ASC NULLS FIRST]
+- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, id_stddev#28]
   +- Window [stddev_samp(_w0#45) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#28], [grp#3L], [id#0L ASC NULLS FIRST]
      +- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, cast(id#0L as double) AS _w0#45]
         +- Window [avg(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#12], [grp#3L], [id#0L ASC NULLS FIRST]
            +- *Sort [grp#3L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(grp#3L, 200)
                  +- *Project [(id#0L % 100) AS grp#3L, id#0L, rand(6537478539664068821) AS col1#4, rand(-8961093871295252795) AS col2#5]
                     +- *Range (0, 1000, step=1, splits=Some(8))
```

## How was this patch tested?

Pass the Jenkins tests with a newly added testsuite.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15317 from dongjoon-hyun/SPARK-17739.
2016-09-30 21:05:06 -07:00
Takuya UESHIN 81455a9cd9 [SPARK-17703][SQL] Add unnamed version of addReferenceObj for minor objects.
## What changes were proposed in this pull request?

There are many minor objects in references, which are extracted to the generated class field, e.g. `errMsg` in `GetExternalRowField` or `ValidateExternalType`, but number of fields in class is limited so we should reduce the number.
This pr adds unnamed version of `addReferenceObj` for these minor objects not to store the object into field but refer it from the `references` field at the time of use.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #15276 from ueshin/issues/SPARK-17703.
2016-09-30 17:31:59 -07:00
Davies Liu f327e16863 [SPARK-17738] [SQL] fix ARRAY/MAP in columnar cache
## What changes were proposed in this pull request?

The actualSize() of array and map is different from the actual size, the header is Int, rather than Long.

## How was this patch tested?

The flaky test should be fixed.

Author: Davies Liu <davies@databricks.com>

Closes #15305 from davies/fix_MAP.
2016-09-30 09:59:12 -07:00
Herman van Hovell 74ac1c4381 [SPARK-17717][SQL] Add exist/find methods to Catalog.
## What changes were proposed in this pull request?
The current user facing catalog does not implement methods for checking object existence or finding objects. You could theoretically do this using the `list*` commands, but this is rather cumbersome and can actually be costly when there are many objects. This PR adds `exists*` and `find*` methods for Databases, Table and Functions.

## How was this patch tested?
Added tests to `org.apache.spark.sql.internal.CatalogSuite`

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15301 from hvanhovell/SPARK-17717.
2016-09-29 17:56:32 -07:00
Dongjoon Hyun 4ecc648ad7 [SPARK-17612][SQL] Support DESCRIBE table PARTITION SQL syntax
## What changes were proposed in this pull request?

This PR implements `DESCRIBE table PARTITION` SQL Syntax again. It was supported until Spark 1.6.2, but was dropped since 2.0.0.

**Spark 1.6.2**
```scala
scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
res1: org.apache.spark.sql.DataFrame = [result: string]

scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")
res2: org.apache.spark.sql.DataFrame = [result: string]

scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false)
+----------------------------------------------------------------+
|result                                                          |
+----------------------------------------------------------------+
|a                      string                                   |
|b                      int                                      |
|c                      string                                   |
|d                      string                                   |
|                                                                |
|# Partition Information                                         |
|# col_name             data_type               comment          |
|                                                                |
|c                      string                                   |
|d                      string                                   |
+----------------------------------------------------------------+
```

**Spark 2.0**
- **Before**
```scala
scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")
res1: org.apache.spark.sql.DataFrame = []

scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false)
org.apache.spark.sql.catalyst.parser.ParseException:
Unsupported SQL statement
```

- **After**
```scala
scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")
res1: org.apache.spark.sql.DataFrame = []

scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false)
+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|a                      |string   |null   |
|b                      |int      |null   |
|c                      |string   |null   |
|d                      |string   |null   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|c                      |string   |null   |
|d                      |string   |null   |
+-----------------------+---------+-------+

scala> sql("DESC EXTENDED partitioned_table PARTITION (c='Us', d=1)").show(100,false)
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-------+
|col_name                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |data_type|comment|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-------+
|a                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |string   |null   |
|b                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |int      |null   |
|c                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |string   |null   |
|d                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |string   |null   |
|# Partition Information                                                                                                                                                                                                                                                                                                                                                                                                                                                            |         |       |
|# col_name                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |data_type|comment|
|c                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |string   |null   |
|d                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |string   |null   |
|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |         |       |
|Detailed Partition Information CatalogPartition(
        Partition Values: [Us, 1]
        Storage(Location: file:/Users/dhyun/SPARK-17612-DESC-PARTITION/spark-warehouse/partitioned_table/c=Us/d=1, InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, Serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Properties: [serialization.format=1])
        Partition Parameters:{transient_lastDdlTime=1475001066})|         |       |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-------+

scala> sql("DESC FORMATTED partitioned_table PARTITION (c='Us', d=1)").show(100,false)
+--------------------------------+---------------------------------------------------------------------------------------+-------+
|col_name                        |data_type                                                                              |comment|
+--------------------------------+---------------------------------------------------------------------------------------+-------+
|a                               |string                                                                                 |null   |
|b                               |int                                                                                    |null   |
|c                               |string                                                                                 |null   |
|d                               |string                                                                                 |null   |
|# Partition Information         |                                                                                       |       |
|# col_name                      |data_type                                                                              |comment|
|c                               |string                                                                                 |null   |
|d                               |string                                                                                 |null   |
|                                |                                                                                       |       |
|# Detailed Partition Information|                                                                                       |       |
|Partition Value:                |[Us, 1]                                                                                |       |
|Database:                       |default                                                                                |       |
|Table:                          |partitioned_table                                                                      |       |
|Location:                       |file:/Users/dhyun/SPARK-17612-DESC-PARTITION/spark-warehouse/partitioned_table/c=Us/d=1|       |
|Partition Parameters:           |                                                                                       |       |
|  transient_lastDdlTime         |1475001066                                                                             |       |
|                                |                                                                                       |       |
|# Storage Information           |                                                                                       |       |
|SerDe Library:                  |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe                                     |       |
|InputFormat:                    |org.apache.hadoop.mapred.TextInputFormat                                               |       |
|OutputFormat:                   |org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat                             |       |
|Compressed:                     |No                                                                                     |       |
|Storage Desc Parameters:        |                                                                                       |       |
|  serialization.format          |1                                                                                      |       |
+--------------------------------+---------------------------------------------------------------------------------------+-------+
```

## How was this patch tested?

Pass the Jenkins tests with a new testcase.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15168 from dongjoon-hyun/SPARK-17612.
2016-09-29 15:30:18 -07:00
Liang-Chi Hsieh 566d7f2827 [SPARK-17653][SQL] Remove unnecessary distincts in multiple unions
## What changes were proposed in this pull request?

Currently for `Union [Distinct]`, a `Distinct` operator is necessary to be on the top of `Union`. Once there are adjacent `Union [Distinct]`,  there will be multiple `Distinct` in the query plan.

E.g.,

For a query like: select 1 a union select 2 b union select 3 c

Before this patch, its physical plan looks like:

    *HashAggregate(keys=[a#13], functions=[])
    +- Exchange hashpartitioning(a#13, 200)
       +- *HashAggregate(keys=[a#13], functions=[])
          +- Union
             :- *HashAggregate(keys=[a#13], functions=[])
             :  +- Exchange hashpartitioning(a#13, 200)
             :     +- *HashAggregate(keys=[a#13], functions=[])
             :        +- Union
             :           :- *Project [1 AS a#13]
             :           :  +- Scan OneRowRelation[]
             :           +- *Project [2 AS b#14]
             :              +- Scan OneRowRelation[]
             +- *Project [3 AS c#15]
                +- Scan OneRowRelation[]

Only the top distinct should be necessary.

After this patch, the physical plan looks like:

    *HashAggregate(keys=[a#221], functions=[], output=[a#221])
    +- Exchange hashpartitioning(a#221, 5)
       +- *HashAggregate(keys=[a#221], functions=[], output=[a#221])
          +- Union
             :- *Project [1 AS a#221]
             :  +- Scan OneRowRelation[]
             :- *Project [2 AS b#222]
             :  +- Scan OneRowRelation[]
             +- *Project [3 AS c#223]
                +- Scan OneRowRelation[]

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15238 from viirya/remove-extra-distinct-union.
2016-09-29 14:30:23 -07:00
Michael Armbrust fe33121a53 [SPARK-17699] Support for parsing JSON string columns
Spark SQL has great support for reading text files that contain JSON data.  However, in many cases the JSON data is just one column amongst others.  This is particularly true when reading from sources such as Kafka.  This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema.

Example usage:
```scala
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

df.select(from_json($"value", schema) as 'json) // => [json: <a: int>]
```

This PR adds support for java, scala and python.  I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it).  I left SQL out for now, because I'm not sure how users would specify a schema.

Author: Michael Armbrust <michael@databricks.com>

Closes #15274 from marmbrus/jsonParser.
2016-09-29 13:01:10 -07:00
Sean Owen b35b0dbbfa
[SPARK-17614][SQL] sparkSession.read() .jdbc(***) use the sql syntax "where 1=0" that Cassandra does not support
## What changes were proposed in this pull request?

Use dialect's table-exists query rather than hard-coded WHERE 1=0 query

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #15196 from srowen/SPARK-17614.
2016-09-29 08:24:34 -04:00
Josh Rosen 37eb9184f1 [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates
## What changes were proposed in this pull request?

This patch fixes a minor correctness issue impacting the pushdown of filters beneath aggregates. Specifically, if a filter condition references no grouping or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed beneath an aggregate.

Intuitively, the only case where you can push a filter beneath an aggregate is when that filter is deterministic and is defined over the grouping columns / expressions, since in that case the filter is acting to exclude entire groups from the query (like a `HAVING` clause). The existing code would only push deterministic filters beneath aggregates when all of the filter's references were grouping columns, but this logic missed the case where a filter has no references. For example, `WHERE false` is deterministic but is independent of the actual data.

This patch fixes this minor bug by adding a new check to ensure that we don't push filters beneath aggregates when those filters don't reference any columns.

## How was this patch tested?

New regression test in FilterPushdownSuite.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15289 from JoshRosen/SPARK-17712.
2016-09-28 19:03:05 -07:00
Herman van Hovell 7d09232028 [SPARK-17641][SQL] Collect_list/Collect_set should not collect null values.
## What changes were proposed in this pull request?
We added native versions of `collect_set` and `collect_list` in Spark 2.0. These currently also (try to) collect null values, this is different from the original Hive implementation. This PR fixes this by adding a null check to the `Collect.update` method.

## How was this patch tested?
Added a regression test to `DataFrameAggregateSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15208 from hvanhovell/SPARK-17641.
2016-09-28 16:25:10 -07:00
Eric Liang 557d6e3227 [SPARK-17713][SQL] Move row-datasource related tests out of JDBCSuite
## What changes were proposed in this pull request?

As a followup for https://github.com/apache/spark/pull/15273 we should move non-JDBC specific tests out of that suite.

## How was this patch tested?

Ran the test.

Author: Eric Liang <ekl@databricks.com>

Closes #15287 from ericl/spark-17713.
2016-09-28 16:20:49 -07:00
Eric Liang a6cfa3f38b [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan
## What changes were proposed in this pull request?

It seems the equality check for reuse of `RowDataSourceScanExec` nodes doesn't respect the output schema. This can cause self-joins or unions over the same underlying data source to return incorrect results if they select different fields.

## How was this patch tested?

New unit test passes after the fix.

Author: Eric Liang <ekl@databricks.com>

Closes #15273 from ericl/spark-17673.
2016-09-28 13:22:45 -07:00
Josh Rosen b03b4adf6d [SPARK-17666] Ensure that RecordReaders are closed by data source file scans
## What changes were proposed in this pull request?

This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed.

This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed.

## How was this patch tested?

Tested manually for now.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15245 from JoshRosen/SPARK-17666-close-recordreader.
2016-09-27 17:52:57 -07:00
Josh Rosen 2f84a68660 [SPARK-17618] Guard against invalid comparisons between UnsafeRow and other formats
This patch ports changes from #15185 to Spark 2.x. In that patch, a  correctness bug in Spark 1.6.x which was caused by an invalid `equals()` comparison between an `UnsafeRow` and another row of a different format. Spark 2.x is not affected by that specific correctness bug but it can still reap the error-prevention benefits of that patch's changes, which modify  ``UnsafeRow.equals()` to throw an IllegalArgumentException if it is called with an object that is not an `UnsafeRow`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15265 from JoshRosen/SPARK-17618-master.
2016-09-27 14:14:27 -07:00
Reynold Xin 67c73052b8 [SPARK-17677][SQL] Break WindowExec.scala into multiple files
## What changes were proposed in this pull request?
As of Spark 2.0, all the window function execution code are in WindowExec.scala. This file is pretty large (over 1k loc) and has a lot of different abstractions in them. This patch creates a new package sql.execution.window, moves WindowExec.scala in it, and breaks WindowExec.scala into multiple, more maintainable pieces:

- AggregateProcessor.scala
- BoundOrdering.scala
- RowBuffer.scala
- WindowExec
- WindowFunctionFrame.scala

## How was this patch tested?
This patch mostly moves code around, and should not change any existing test coverage.

Author: Reynold Xin <rxin@databricks.com>

Closes #15252 from rxin/SPARK-17677.
2016-09-27 12:37:19 -07:00
gatorsmile 2ab24a7bf6 [SPARK-17660][SQL] DESC FORMATTED for VIEW Lacks View Definition
### What changes were proposed in this pull request?
Before this PR, `DESC FORMATTED` does not have a section for the view definition. We should add it for permanent views, like what Hive does.

```
+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                            |comment|
+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
|a                           |int                                                                                                                                  |null   |
|                            |                                                                                                                                     |       |
|# Detailed Table Information|                                                                                                                                     |       |
|Database:                   |default                                                                                                                              |       |
|Owner:                      |xiaoli                                                                                                                               |       |
|Create Time:                |Sat Sep 24 21:46:19 PDT 2016                                                                                                         |       |
|Last Access Time:           |Wed Dec 31 16:00:00 PST 1969                                                                                                         |       |
|Location:                   |                                                                                                                                     |       |
|Table Type:                 |VIEW                                                                                                                                 |       |
|Table Parameters:           |                                                                                                                                     |       |
|  transient_lastDdlTime     |1474778779                                                                                                                           |       |
|                            |                                                                                                                                     |       |
|# Storage Information       |                                                                                                                                     |       |
|SerDe Library:              |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe                                                                                   |       |
|InputFormat:                |org.apache.hadoop.mapred.SequenceFileInputFormat                                                                                     |       |
|OutputFormat:               |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat                                                                            |       |
|Compressed:                 |No                                                                                                                                   |       |
|Storage Desc Parameters:    |                                                                                                                                     |       |
|  serialization.format      |1                                                                                                                                    |       |
|                            |                                                                                                                                     |       |
|# View Information          |                                                                                                                                     |       |
|View Original Text:         |SELECT * FROM tbl                                                                                                                    |       |
|View Expanded Text:         |SELECT `gen_attr_0` AS `a` FROM (SELECT `gen_attr_0` FROM (SELECT `a` AS `gen_attr_0` FROM `default`.`tbl`) AS gen_subquery_0) AS tbl|       |
+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
```

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15234 from gatorsmile/descFormattedView.
2016-09-27 10:52:26 -07:00
Reynold Xin 120723f934 [SPARK-17682][SQL] Mark children as final for unary, binary, leaf expressions and plan nodes
## What changes were proposed in this pull request?
This patch marks the children method as final in unary, binary, and leaf expressions and plan nodes (both logical plan and physical plan), as brought up in http://apache-spark-developers-list.1001551.n3.nabble.com/Should-LeafExpression-have-children-final-override-like-Nondeterministic-td19104.html

## How was this patch tested?
This is a simple modifier change and has no impact on test coverage.

Author: Reynold Xin <rxin@databricks.com>

Closes #15256 from rxin/SPARK-17682.
2016-09-27 10:20:30 -07:00
hyukjinkwon 2cac3b2d4a [SPARK-16516][SQL] Support for pushing down filters for decimal and timestamp types in ORC
## What changes were proposed in this pull request?

It seems ORC supports all the types in  ([`PredicateLeaf.Type`](e085b7e9bd/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java (L50-L56))) which includes timestamp type and decimal type.

In more details, the types listed in [`SearchArgumentImpl.boxLiteral()`](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) can be used as a filter value.

FYI, inital `case` caluse for supported types was introduced in 65d71bd9fb and this was not changed overtime. At that time, Hive version was, 0.13 which supports only some types for filter-push down (See [SearchArgumentImpl.java#L945-L965](https://github.com/apache/hive/blob/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L945-L965) at 0.13).

However, the version was upgraded into 1.2.x and now it supports more types (See [SearchArgumentImpl.java#L1068-L1093](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) at 1.2.0)

## How was this patch tested?

Unit tests in `OrcFilterSuite` and `OrcQuerySuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14172 from HyukjinKwon/SPARK-16516.
2016-09-28 00:50:12 +08:00
hyukjinkwon 5de1737b02 [SPARK-16777][SQL] Do not use deprecated listType API in ParquetSchemaConverter
## What changes were proposed in this pull request?

This PR removes build waning as below.

```scala
[WARNING] .../spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala:448: method listType in object ConversionPatterns is deprecated: see corresponding Javadoc for more information.
[WARNING]         ConversionPatterns.listType(
[WARNING]                            ^
[WARNING] .../spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala:464: method listType in object ConversionPatterns is deprecated: see corresponding Javadoc for more information.
[WARNING]         ConversionPatterns.listType(
[WARNING]                            ^
```

This should not use `listOfElements` (recommended to be replaced from `listType`) instead because the new method checks if the name of elements in Parquet's `LIST` is `element` in Parquet schema and throws an exception if not. However, It seems Spark prior to 1.4.x writes `ArrayType` with Parquet's `LIST` but with `array` as its element name.

Therefore, this PR avoids to use both `listOfElements` and `listType` but just use the existing schema builder to construct the same `GroupType`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14399 from HyukjinKwon/SPARK-16777.
2016-09-28 00:39:47 +08:00
Kazuaki Ishizaki 85b0a15754 [SPARK-15962][SQL] Introduce implementation with a dense format for UnsafeArrayData
## What changes were proposed in this pull request?

This PR introduces more compact representation for ```UnsafeArrayData```.

```UnsafeArrayData``` needs to accept ```null``` value in each entry of an array. In the current version, it has three parts
```
[numElements] [offsets] [values]
```
`Offsets` has the number of `numElements`, and represents `null` if its value is negative. It may increase memory footprint, and introduces an indirection for accessing each of `values`.

This PR uses bitvectors to represent nullability for each element like `UnsafeRow`, and eliminates an indirection for accessing each element. The new ```UnsafeArrayData``` has four parts.
```
[numElements][null bits][values or offset&length][variable length portion]
```
In the `null bits` region, we store 1 bit per element, represents whether an element is null. Its total size is ceil(numElements / 8) bytes, and it is aligned to 8-byte boundaries.
In the `values or offset&length` region, we store the content of elements. For fields that hold fixed-length primitive types, such as long, double, or int, we store the value directly in the field. For fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the base address of the array) that points to the beginning of the variable-length field and length (they are combined into a long). Each is word-aligned. For `variable length portion`, each is aligned to 8-byte boundaries.

The new format can reduce memory footprint and improve performance of accessing each element. An example of memory foot comparison:
1024x1024 elements integer array
Size of ```baseObject``` for ```UnsafeArrayData```: 8 + 1024x1024 + 1024x1024 = 2M bytes
Size of ```baseObject``` for ```UnsafeArrayData```: 8 + 1024x1024/8 + 1024x1024 = 1.25M bytes

In summary, we got 1.0-2.6x performance improvements over the code before applying this PR.
Here are performance results of [benchmark programs](04d2e4b6db/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala):

**Read UnsafeArrayData**: 1.7x and 1.6x performance improvements over the code before applying this PR
````
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without SPARK-15962
Read UnsafeArrayData:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            430 /  436        390.0           2.6       1.0X
Double                                         456 /  485        367.8           2.7       0.9X

With SPARK-15962
Read UnsafeArrayData:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            252 /  260        666.1           1.5       1.0X
Double                                         281 /  292        597.7           1.7       0.9X
````
**Write UnsafeArrayData**: 1.0x and 1.1x performance improvements over the code before applying this PR
````
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without SPARK-15962
Write UnsafeArrayData:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            203 /  273        103.4           9.7       1.0X
Double                                         239 /  356         87.9          11.4       0.8X

With SPARK-15962
Write UnsafeArrayData:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            196 /  249        107.0           9.3       1.0X
Double                                         227 /  367         92.3          10.8       0.9X
````

**Get primitive array from UnsafeArrayData**: 2.6x and 1.6x performance improvements over the code before applying this PR
````
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without SPARK-15962
Get primitive array from UnsafeArrayData: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            207 /  217        304.2           3.3       1.0X
Double                                         257 /  363        245.2           4.1       0.8X

With SPARK-15962
Get primitive array from UnsafeArrayData: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            151 /  198        415.8           2.4       1.0X
Double                                         214 /  394        293.6           3.4       0.7X
````

**Create UnsafeArrayData from primitive array**: 1.7x and 2.1x performance improvements over the code before applying this PR
````
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without SPARK-15962
Create UnsafeArrayData from primitive array: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            340 /  385        185.1           5.4       1.0X
Double                                         479 /  705        131.3           7.6       0.7X

With SPARK-15962
Create UnsafeArrayData from primitive array: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            206 /  211        306.0           3.3       1.0X
Double                                         232 /  406        271.6           3.7       0.9X
````

1.7x and 1.4x performance improvements in [```UDTSerializationBenchmark```](https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala)  over the code before applying this PR
````
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without SPARK-15962
VectorUDT de/serialization:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
serialize                                      442 /  533          0.0      441927.1       1.0X
deserialize                                    217 /  274          0.0      217087.6       2.0X

With SPARK-15962
VectorUDT de/serialization:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
serialize                                      265 /  318          0.0      265138.5       1.0X
deserialize                                    155 /  197          0.0      154611.4       1.7X
````

## How was this patch tested?

Added unit tests into ```UnsafeArraySuite```

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #13680 from kiszk/SPARK-15962.
2016-09-27 14:18:32 +08:00
Sameer Agarwal 7c7586aef9 [SPARK-17652] Fix confusing exception message while reserving capacity
## What changes were proposed in this pull request?

This minor patch fixes a confusing exception message while reserving additional capacity in the vectorized parquet reader.

## How was this patch tested?

Exisiting Unit Tests

Author: Sameer Agarwal <sameerag@cs.berkeley.edu>

Closes #15225 from sameeragarwal/error-msg.
2016-09-26 13:21:08 -07:00
Liang-Chi Hsieh 8135e0e5eb [SPARK-17153][SQL] Should read partition data when reading new files in filestream without globbing
## What changes were proposed in this pull request?

When reading file stream with non-globbing path, the results return data with all `null`s for the
partitioned columns. E.g.,

    case class A(id: Int, value: Int)
    val data = spark.createDataset(Seq(
      A(1, 1),
      A(2, 2),
      A(2, 3))
    )
    val url = "/tmp/test"
    data.write.partitionBy("id").parquet(url)
    spark.read.parquet(url).show

    +-----+---+
    |value| id|
    +-----+---+
    |    2|  2|
    |    3|  2|
    |    1|  1|
    +-----+---+

    val s = spark.readStream.schema(spark.read.load(url).schema).parquet(url)
    s.writeStream.queryName("test").format("memory").start()

    sql("SELECT * FROM test").show

    +-----+----+
    |value|  id|
    +-----+----+
    |    2|null|
    |    3|null|
    |    1|null|
    +-----+----+

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #14803 from viirya/filestreamsource-option.
2016-09-26 13:07:11 -07:00
Justin Pihony 50b89d05b7
[SPARK-14525][SQL] Make DataFrameWrite.save work for jdbc
## What changes were proposed in this pull request?

This change modifies the implementation of DataFrameWriter.save such that it works with jdbc, and the call to jdbc merely delegates to save.

## How was this patch tested?

This was tested via unit tests in the JDBCWriteSuite, of which I added one new test to cover this scenario.

## Additional details

rxin This seems to have been most recently touched by you and was also commented on in the JIRA.

This contribution is my original work and I license the work to the project under the project's open source license.

Author: Justin Pihony <justin.pihony@gmail.com>
Author: Justin Pihony <justin.pihony@typesafe.com>

Closes #12601 from JustinPihony/jdbc_reconciliation.
2016-09-26 09:54:22 +01:00
xin wu de333d121d [SPARK-17551][SQL] Add DataFrame API for null ordering
## What changes were proposed in this pull request?
This pull request adds Scala/Java DataFrame API for null ordering (NULLS FIRST | LAST).

Also did some minor clean up for related code (e.g. incorrect indentation), and renamed "orderby-nulls-ordering.sql" to be consistent with existing test files.

## How was this patch tested?
Added a new test case in DataFrameSuite.

Author: petermaxlee <petermaxlee@gmail.com>
Author: Xin Wu <xinwu@us.ibm.com>

Closes #15123 from petermaxlee/SPARK-17551.
2016-09-25 16:46:12 -07:00
Michael Armbrust 988c714573 [SPARK-17643] Remove comparable requirement from Offset
For some sources, it is difficult to provide a global ordering based only on the data in the offset.  Since we don't use comparison for correctness, lets remove it.

Author: Michael Armbrust <michael@databricks.com>

Closes #15207 from marmbrus/removeComparable.
2016-09-23 12:17:59 -07:00
Shixiong Zhu 62ccf27ab4 [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry
## What changes were proposed in this pull request?

Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (#15203 is an example).

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15206 from zsxwing/cleanup.
2016-09-22 23:35:08 -07:00
Yucai Yu 79159a1e87 [SPARK-17635][SQL] Remove hardcode "agg_plan" in HashAggregateExec
## What changes were proposed in this pull request?

"agg_plan" are hardcoded in HashAggregateExec, which have potential issue, so removing them.

## How was this patch tested?

existing tests.

Author: Yucai Yu <yucai.yu@intel.com>

Closes #15199 from yucai/agg_plan.
2016-09-22 17:22:56 -07:00
Burak Yavuz a166196831 [SPARK-17569][SPARK-17569][TEST] Make the unit test added for work again
## What changes were proposed in this pull request?

A [PR](a6aade0042) was merged concurrently that made the unit test for PR #15122 not test anything anymore. This PR fixes the test.

## How was this patch tested?

Changed line 0d63487502/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala (L137)
from `false` to `true` and made sure the unit test failed.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15203 from brkyvz/fix-test.
2016-09-22 16:50:22 -07:00
Herman van Hovell 0d63487502 [SPARK-17616][SQL] Support a single distinct aggregate combined with a non-partial aggregate
## What changes were proposed in this pull request?
We currently cannot execute an aggregate that contains a single distinct aggregate function and an one or more non-partially plannable aggregate functions, for example:
```sql
select   grp,
         collect_list(col1),
         count(distinct col2)
from     tbl_a
group by 1
```
This is a regression from Spark 1.6. This is caused by the fact that the single distinct aggregation code path assumes that all aggregates can be planned in two phases (is partially aggregatable). This PR works around this issue by triggering the `RewriteDistinctAggregates` in such cases (this is similar to the approach taken in 1.6).

## How was this patch tested?
Created `RewriteDistinctAggregatesSuite` which checks if the aggregates with distinct aggregate functions get rewritten into two `Aggregates` and an `Expand`. Added a regression test to `DataFrameAggregateSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15187 from hvanhovell/SPARK-17616.
2016-09-22 14:29:27 -07:00
Burak Yavuz 85d609cf25 [SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames
## What changes were proposed in this pull request?

Consider you have a bucket as `s3a://some-bucket`
and under it you have files:
```
s3a://some-bucket/file1.parquet
s3a://some-bucket/file2.parquet
```
Getting the parent path of `s3a://some-bucket/file1.parquet` yields
`s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the hash map.

When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the end) to get the list of files, and we're left with an empty list!

This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` doesn't have a parent, i.e. is the root. This is a no-op if the path already had a `/` at the end, and is handled through the Hadoop Path, path merging semantics.

## How was this patch tested?

Unit test in `FileCatalogSuite`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15169 from brkyvz/SPARK-17613.
2016-09-22 13:05:41 -07:00