Commit graph

1320 commits

Author SHA1 Message Date
gatorsmile 68be5b9e8a [SPARK-14396][SQL] Throw Exceptions for DDLs of Partitioned Views
#### What changes were proposed in this pull request?

Because the concept of partitioning is associated with physical tables, we disable all the supports of partitioned views, which are defined in the following three commands in [Hive DDL Manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView):
```
ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];

ALTER VIEW view ADD [IF NOT EXISTS] PARTITION spec;

CREATE VIEW [IF NOT EXISTS] [db_name.]view_name [(column_name [COMMENT column_comment], ...) ]
  [COMMENT view_comment]
  [TBLPROPERTIES (property_name = property_value, ...)]
  AS SELECT ...;
```

An exception is thrown when users issue any of these three DDL commands.

#### How was this patch tested?
Added test cases for parsing create view and changed the existing test cases to verify if the exceptions are thrown.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12169 from gatorsmile/viewPartition.
2016-04-05 22:33:44 -07:00
Andrew Or adbfdb878d [SPARK-14128][SQL] Alter table DDL followup
## What changes were proposed in this pull request?

This is just a followup to #12121, which implemented the alter table DDLs using the `SessionCatalog`. Specially, this corrects the behavior of setting the location of a datasource table. For datasource tables, we need to set the `locationUri` in addition to the `path` entry in the serde properties. Additionally, changing the location of a datasource table partition is not allowed.

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #12186 from andrewor14/alter-table-ddl-followup.
2016-04-05 21:23:20 -07:00
Wenchen Fan f6456fa80b [SPARK-14296][SQL] whole stage codegen support for Dataset.map
## What changes were proposed in this pull request?

This PR adds a new operator `MapElements` for `Dataset.map`, it's a 1-1 mapping and is easier to adapt to whole stage codegen framework.

## How was this patch tested?

new test in `WholeStageCodegenSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12087 from cloud-fan/map.
2016-04-06 12:09:10 +08:00
Eric Liang 7d29c72f64 [SPARK-14359] Unit tests for java 8 lambda syntax with typed aggregates
## What changes were proposed in this pull request?

Adds unit tests for java 8 lambda syntax with typed aggregates as a follow-up to #12168

## How was this patch tested?

Unit tests.

Author: Eric Liang <ekl@databricks.com>

Closes #12181 from ericl/sc-2794-2.
2016-04-05 21:22:20 -05:00
Marcelo Vanzin d5ee9d5c24 [SPARK-529][SQL] Modify SQLConf to use new config API from core.
Because SQL keeps track of all known configs, some customization was
needed in SQLConf to allow that, since the core API does not have that
feature.

Tested via existing (and slightly updated) unit tests.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #11570 from vanzin/SPARK-529-sql.
2016-04-05 15:19:51 -07:00
Andrew Or 45d8cdee39 [SPARK-14129][SPARK-14128][SQL] Alter table DDL commands
## What changes were proposed in this pull request?

In Spark 2.0, we want to handle the most common `ALTER TABLE` commands ourselves instead of passing the entire query text to Hive. This is done using the new `SessionCatalog` API introduced recently.

The commands supported in this patch include:
```
ALTER TABLE ... RENAME TO ...
ALTER TABLE ... SET TBLPROPERTIES ...
ALTER TABLE ... UNSET TBLPROPERTIES ...
ALTER TABLE ... SET LOCATION ...
ALTER TABLE ... SET SERDE ...
```
The commands we explicitly do not support are:
```
ALTER TABLE ... CLUSTERED BY ...
ALTER TABLE ... SKEWED BY ...
ALTER TABLE ... NOT CLUSTERED
ALTER TABLE ... NOT SORTED
ALTER TABLE ... NOT SKEWED
ALTER TABLE ... NOT STORED AS DIRECTORIES
```
For these we throw exceptions complaining that they are not supported.

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #12121 from andrewor14/alter-table-ddl.
2016-04-05 14:54:07 -07:00
Dongjoon Hyun c59abad052 [SPARK-14402][SQL] initcap UDF doesn't match Hive/Oracle behavior in lowercasing rest of string
## What changes were proposed in this pull request?

Current, SparkSQL `initCap` is using `toTitleCase` function. However, `UTF8String.toTitleCase` implementation changes only the first letter and just copy the other letters: e.g. sParK --> SParK. This is the correct implementation `toTitleCase`.
```
hive> select initcap('sParK');
Spark
```
```
scala> sql("select initcap('sParK')").head
res0: org.apache.spark.sql.Row = [SParK]
```

This PR updates the implementation of `initcap` using `toLowerCase` and `toTitleCase`.

## How was this patch tested?

Pass the Jenkins tests (including new testcase).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12175 from dongjoon-hyun/SPARK-14402.
2016-04-05 13:31:00 -07:00
Burak Yavuz 9ee5c25717 [SPARK-14353] Dataset Time Window window API for Python, and SQL
## What changes were proposed in this pull request?

The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008).
This PR adds the Python, and SQL, API for this function.

With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
 - `window(timeColumn, windowDuration)`
 - `window(timeColumn, windowDuration, slideDuration)`
 - `window(timeColumn, windowDuration, slideDuration, startTime)`

In Python, users can access all APIs above, but in addition they can do
 - In Python:
   `window(timeColumn, windowDuration, startTime=...)`

that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.

## How was this patch tested?

Unit tests + manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12136 from brkyvz/python-windows.
2016-04-05 13:18:39 -07:00
Yin Huai 72544d6f2a [SPARK-14123][SPARK-14384][SQL] Handle CreateFunction/DropFunction
## What changes were proposed in this pull request?
This PR implements CreateFunction and DropFunction commands. Besides implementing these two commands, we also change how to manage functions. Here are the main changes.
* `FunctionRegistry` will be a container to store all functions builders and it will not actively load any functions. Because of this change, we do not need to maintain a separate registry for HiveContext. So, `HiveFunctionRegistry` is deleted.
* SessionCatalog takes care the job of loading a function if this function is not in the `FunctionRegistry` but its metadata is stored in the external catalog. For this case, SessionCatalog will (1) load the metadata from the external catalog, (2) load all needed resources (i.e. jars and files), (3) create a function builder based on the function definition, (4) register the function builder in the `FunctionRegistry`.
* A `UnresolvedGenerator` is created. So, the parser will not need to call `FunctionRegistry` directly during parsing, which is not a good time to create a Hive UDTF. In the analysis phase, we will resolve `UnresolvedGenerator`.

This PR is based on viirya's https://github.com/apache/spark/pull/12036/

## How was this patch tested?
Existing tests and new tests.

## TODOs
[x] Self-review
[x] Cleanup
[x] More tests for create/drop functions (we need to more tests for permanent functions).
[ ] File JIRAs for all TODOs
[x] Standardize the error message when a function does not exist.

Author: Yin Huai <yhuai@databricks.com>
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #12117 from yhuai/function.
2016-04-05 12:27:06 -07:00
Shixiong Zhu 463bac0011 [SPARK-14257][SQL] Allow multiple continuous queries to be started from the same DataFrame
## What changes were proposed in this pull request?

Make StreamingRelation store the closure to create the source in StreamExecution so that we can start multiple continuous queries from the same DataFrame.

## How was this patch tested?

`test("DataFrame reuse")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12049 from zsxwing/df-reuse.
2016-04-05 11:12:05 -07:00
gatorsmile 7807173679 [SPARK-14349][SQL] Issue Error Messages for Unsupported Operators/DML/DDL in SQL Context.
#### What changes were proposed in this pull request?

Currently, the weird error messages are issued if we use Hive Context-only operations in SQL Context.

For example,
- When calling `Drop Table` in SQL Context, we got the following message:
```
Expected exception org.apache.spark.sql.catalyst.parser.ParseException to be thrown, but java.lang.ClassCastException was thrown.
```

- When calling `Script Transform` in SQL Context, we got the message:
```
assertion failed: No plan for ScriptTransformation [key#9,value#10], cat, [tKey#155,tValue#156], null
+- LogicalRDD [key#9,value#10], MapPartitionsRDD[3] at beforeAll at BeforeAndAfterAll.scala:187
```

Updates:
Based on the investigation from hvanhovell , the root cause is `visitChildren`, which is the default implementation. It always returns the result of the last defined context child. After merging the code changes from hvanhovell , it works! Thank you hvanhovell !

#### How was this patch tested?
A few test cases are added.

Not sure if the same issue exist for the other operators/DDL/DML. hvanhovell

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Herman van Hovell <hvanhovell@questtec.nl>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12134 from gatorsmile/hiveParserCommand.
2016-04-05 11:19:46 +02:00
Dilip Biswal 2715bc68bd [SPARK-14348][SQL] Support native execution of SHOW TBLPROPERTIES command
## What changes were proposed in this pull request?

This PR adds Native execution of SHOW TBLPROPERTIES command.

Command Syntax:
``` SQL
SHOW TBLPROPERTIES table_name[(property_key_literal)]
```
## How was this patch tested?

Tests added in HiveComandSuiie and DDLCommandSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #12133 from dilipbiswal/dkb_show_tblproperties.
2016-04-05 08:41:59 +02:00
Eric Liang 064623014e [SPARK-14359] Create built-in functions for typed aggregates in Java
## What changes were proposed in this pull request?

This adds the corresponding Java static functions for built-in typed aggregates already exposed in Scala.

## How was this patch tested?

Unit tests.

rxin

Author: Eric Liang <ekl@databricks.com>

Closes #12168 from ericl/sc-2794.
2016-04-05 00:30:55 -05:00
Burak Yavuz ba24d1ee9a [SPARK-14287] isStreaming method for Dataset
With the addition of StreamExecution (ContinuousQuery) to Datasets, data will become unbounded. With unbounded data, the execution of some methods and operations will not make sense, e.g. `Dataset.count()`.

A simple API is required to check whether the data in a Dataset is bounded or unbounded. This will allow users to check whether their Dataset is in streaming mode or not. ML algorithms may check if the data is unbounded and throw an exception for example.

The implementation of this method is simple, however naming it is the challenge. Some possible names for this method are:
 - isStreaming
 - isContinuous
 - isBounded
 - isUnbounded

I've gone with `isStreaming` for now. We can change it before Spark 2.0 if we decide to come up with a different name. For that reason I've marked it as `Experimental`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12080 from brkyvz/is-streaming.
2016-04-04 19:04:09 -07:00
Davies Liu 400b2f863f [SPARK-14259] [SQL] Merging small files together based on the cost of opening
## What changes were proposed in this pull request?

This PR basically re-do the things in #12068 but with a different model, which should work better in case of small files with different sizes.

## How was this patch tested?

Updated existing tests.

Ran a query on thousands of partitioned small files locally, with all default settings (the cost to open a file should be over estimated), the durations of tasks become smaller and smaller, which is good (the last few tasks will be shortest).

Author: Davies Liu <davies@databricks.com>

Closes #12095 from davies/file_cost.
2016-04-04 14:41:03 -07:00
Davies Liu cc70f17416 [SPARK-14334] [SQL] add toLocalIterator for Dataset/DataFrame
## What changes were proposed in this pull request?

RDD.toLocalIterator() could be used to fetch one partition at a time to reduce the memory usage. Right now, for Dataset/Dataframe we have to use df.rdd.toLocalIterator, which is super slow also requires lots of memory (because of the Java serializer or even Kyro serializer).

This PR introduce an optimized toLocalIterator for Dataset/DataFrame, which is much faster and requires much less memory. For a partition with 5 millions rows, `df.rdd.toIterator` took about 100 seconds, but df.toIterator took less than 7 seconds. For 10 millions row, rdd.toIterator will crash (not enough memory) with 4G heap, but df.toLocalIterator could finished in 12 seconds.

The JDBC server has been updated to use DataFrame.toIterator.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #12114 from davies/local_iterator.
2016-04-04 13:31:44 -07:00
Shixiong Zhu 855ed44ed3 [SPARK-14176][SQL] Add DataFrameWriter.trigger to set the stream batch period
## What changes were proposed in this pull request?

Add a processing time trigger to control the batch processing speed

## How was this patch tested?

Unit tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11976 from zsxwing/trigger.
2016-04-04 10:54:06 -07:00
Davies Liu 745425332f [SPARK-14137] [SQL] Cleanup hash join
## What changes were proposed in this pull request?

This PR did a few cleanup on HashedRelation and HashJoin:

1) Merge HashedRelation and UniqueHashedRelation together
2) Return an iterator from HashedRelation, so we donot need a create many UnsafeRow objects.
3) Return a copy of HashedRelation for thread-safety in BroadcastJoin, so we can re-use the UnafeRow objects.
4) Cleanup HashJoin, share most of the code between BroadcastHashJoin and ShuffleHashJoin
5) Removed UniqueLongHashedRelation, which will be replaced by LongUnsafeMap (another PR).
6) Update benchmark, before this patch, the selectivity of joins are too high.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #12102 from davies/cleanup_hash.
2016-04-04 10:01:24 -07:00
Matei Zaharia 76f3c735aa [SPARK-14356] Update spark.sql.execution.debug to work on Datasets
## What changes were proposed in this pull request?

Update DebugQuery to work on Datasets of any type, not just DataFrames.

## How was this patch tested?

Added unit tests, checked in spark-shell.

Author: Matei Zaharia <matei@databricks.com>

Closes #12140 from mateiz/debug-dataset.
2016-04-03 21:08:54 -07:00
Dongjoon Hyun 3f749f7ed4 [SPARK-14355][BUILD] Fix typos in Exception/Testcase/Comments and static analysis results
## What changes were proposed in this pull request?

This PR contains the following 5 types of maintenance fix over 59 files (+94 lines, -93 lines).
- Fix typos(exception/log strings, testcase name, comments) in 44 lines.
- Fix lint-java errors (MaxLineLength) in 6 lines. (New codes after SPARK-14011)
- Use diamond operators in 40 lines. (New codes after SPARK-13702)
- Fix redundant semicolon in 5 lines.
- Rename class `InferSchemaSuite` to `CSVInferSchemaSuite` in CSVInferSchemaSuite.scala.

## How was this patch tested?

Manual and pass the Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12139 from dongjoon-hyun/SPARK-14355.
2016-04-03 18:14:16 -07:00
hyukjinkwon 2262a93358 [SPARK-14231] [SQL] JSON data source infers floating-point values as a double when they do not fit in a decimal
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-14231

Currently, JSON data source supports to infer `DecimalType` for big numbers and `floatAsBigDecimal` option which reads floating-point values as `DecimalType`.

But there are few restrictions in Spark `DecimalType` below:

1. The precision cannot be bigger than 38.
2. scale cannot be bigger than precision.

Currently, both restrictions are not being handled.

This PR handles the cases by inferring them as `DoubleType`. Also, the option name was changed from `floatAsBigDecimal` to `prefersDecimal` as suggested [here](https://issues.apache.org/jira/browse/SPARK-14231?focusedCommentId=15215579&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15215579).

So, the codes below:

```scala
def doubleRecords: RDD[String] =
  sqlContext.sparkContext.parallelize(
    s"""{"a": 1${"0" * 38}, "b": 0.01}""" ::
    s"""{"a": 2${"0" * 38}, "b": 0.02}""" :: Nil)

val jsonDF = sqlContext.read
  .option("prefersDecimal", "true")
  .json(doubleRecords)
jsonDF.printSchema()
```

produces below:

- **Before**

```scala
org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1).;
	at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:44)
	at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:144)
	at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:108)
	at
...
```

- **After**

```scala
root
 |-- a: double (nullable = true)
 |-- b: double (nullable = true)
```

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #12030 from HyukjinKwon/SPARK-14231.
2016-04-02 23:12:04 -07:00
Dongjoon Hyun 4a6e78abd9 [MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.
## What changes were proposed in this pull request?

This PR aims to fix all Scala-Style multiline comments into Java-Style multiline comments in Scala codes.
(All comment-only changes over 77 files: +786 lines, −747 lines)

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12130 from dongjoon-hyun/use_multiine_javadoc_comments.
2016-04-02 17:50:40 -07:00
Reynold Xin a3e293542a [HOTFIX] Disable StateStoreSuite.maintenance 2016-04-02 12:44:02 -07:00
Reynold Xin 67d753516d [HOTFIX] Fix compilation break. 2016-04-02 00:00:19 -07:00
hyukjinkwon d7982a3a9a [MINOR][SQL] Fix comments styl and correct several styles and nits in CSV data source
## What changes were proposed in this pull request?

While trying to create a PR (which was not an issue at the end), I just corrected some style nits.

So, I removed the changes except for some coding style corrections.

- According to the [scala-style-guide#documentation-style](https://github.com/databricks/scala-style-guide#documentation-style), Scala style comments are discouraged.

>```scala
>/** This is a correct one-liner, short description. */
>
>/**
>  * This is correct multi-line JavaDoc comment. And
>  * this is my second line, and if I keep typing, this would be
>  * my third line.
>  */
>
>/** In Spark, we don't use the ScalaDoc style so this
>   * is not correct.
>   */
>```

- Double newlines between consecutive methods was removed. According to [scala-style-guide#blank-lines-vertical-whitespace](https://github.com/databricks/scala-style-guide#blank-lines-vertical-whitespace), single newline appears when

>Between consecutive members (or initializers) of a class: fields, constructors, methods, nested classes, static initializers, instance initializers.

- Remove uesless parentheses in tests

- Use `mapPartitions` instead of `mapPartitionsWithIndex()`.

## How was this patch tested?

Unit tests were used and `dev/run_tests` for style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #12109 from HyukjinKwon/SPARK-14271.
2016-04-01 22:51:47 -07:00
Reynold Xin f414154418 [SPARK-14285][SQL] Implement common type-safe aggregate functions
## What changes were proposed in this pull request?
In the Dataset API, it is fairly difficult for users to perform simple aggregations in a type-safe way at the moment because there are no aggregators that have been implemented. This pull request adds a few common aggregate functions in expressions.scala.typed package, and also creates the expressions.java.typed package without implementation. The java implementation should probably come as a separate pull request. One challenge there is to resolve the type difference between Scala primitive types and Java boxed types.

## How was this patch tested?
Added unit tests for them.

Author: Reynold Xin <rxin@databricks.com>

Closes #12077 from rxin/SPARK-14285.
2016-04-01 22:46:56 -07:00
Dongjoon Hyun fa1af0aff7 [SPARK-14251][SQL] Add SQL command for printing out generated code for debugging
## What changes were proposed in this pull request?

This PR implements `EXPLAIN CODEGEN` SQL command which returns generated codes like `debugCodegen`. In `spark-shell`, we don't need to `import debug` module. In `spark-sql`, we can use this SQL command now.

**Before**
```
scala> import org.apache.spark.sql.execution.debug._
scala> sql("select 'a' as a group by 1").debugCodegen()
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
...

Generated code:
...

== Subtree 2 / 2 ==
...

Generated code:
...
```

**After**
```
scala> sql("explain extended codegen select 'a' as a group by 1").collect().foreach(println)
[Found 2 WholeStageCodegen subtrees.]
[== Subtree 1 / 2 ==]
...
[]
[Generated code:]
...
[]
[== Subtree 2 / 2 ==]
...
[]
[Generated code:]
...
```

## How was this patch tested?

Pass the Jenkins tests (including new testcases)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12099 from dongjoon-hyun/SPARK-14251.
2016-04-01 22:45:52 -07:00
Kazuaki Ishizaki 877dc712e6 [SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
## What changes were proposed in this pull request?

This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for  lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method

## How was this patch tested?

Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite```

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #12108 from kiszk/SPARK-14138-master.
2016-04-01 22:38:07 -07:00
Michael Armbrust 0fc4aaa71c [SPARK-14255][SQL] Streaming Aggregation
This PR adds the ability to perform aggregations inside of a `ContinuousQuery`.  In order to implement this feature, the planning of aggregation has augmented with a new `StatefulAggregationStrategy`.  Unlike batch aggregation, stateful-aggregation uses the `StateStore` (introduced in #11645) to persist the results of partial aggregation across different invocations.  The resulting physical plan performs the aggregation using the following progression:
   - Partial Aggregation
   - Shuffle
   - Partial Merge (now there is at most 1 tuple per group)
   - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous)
   - Partial Merge (now there is at most 1 tuple per group)
   - StateStoreSave (saves the tuple for the next batch)
   - Complete (output the current result of the aggregation)

The following refactoring was also performed to allow us to plug into existing code:
 - The get/put implementation is taken from #12013
 - The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern `PhysicalAggregation`
 - The `AttributeReference` used to identify the result of an `AggregateFunction` as been moved into the `AggregateExpression` container.  This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a `Map[(AggregateFunction, Boolean), Attribute]`.  Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup.
 - Some planning logic is moved from the `SessionState` into the `QueryExecution` to make it easier to override in the streaming case.
 - The ability to write a `StreamTest` that checks only the output of the last batch has been added to simulate the future addition of output modes.

Author: Michael Armbrust <michael@databricks.com>

Closes #12048 from marmbrus/statefulAgg.
2016-04-01 15:15:16 -07:00
Shixiong Zhu 0b7d4966ca [SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpoint
## What changes were proposed in this pull request?

RpcEndpoint is not thread safe and allows multiple messages to be processed at the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12100 from zsxwing/fix-StateStoreCoordinator.
2016-04-01 15:00:38 -07:00
Liang-Chi Hsieh 3e991dbc31 [SPARK-13674] [SQL] Add wholestage codegen support to Sample
JIRA: https://issues.apache.org/jira/browse/SPARK-13674

## What changes were proposed in this pull request?

Sample operator doesn't support wholestage codegen now. This pr is to add support to it.

## How was this patch tested?

A test is added into `BenchmarkWholeStageCodegen`. Besides, all tests should be passed.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #11517 from viirya/add-wholestage-sample.
2016-04-01 14:02:32 -07:00
Burak Yavuz 1b829ce139 [SPARK-14160] Time Windowing functions for Datasets
## What changes were proposed in this pull request?

This PR adds the function `window` as a column expression.

`window` can be used to bucket rows into time windows given a time column. With this expression, performing time series analysis on batch data, as well as streaming data should become much more simpler.

### Usage

Assume the following schema:

`sensor_id, measurement, timestamp`

To average 5 minute data every 1 minute (window length of 5 minutes, slide duration of 1 minute), we will use:
```scala
df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id")
  .agg(mean("measurement").as("avg_meas"))
```

This will generate windows such as:
```
09:00:00-09:05:00
09:01:00-09:06:00
09:02:00-09:07:00 ...
```

Intervals will start at every `slideDuration` starting at the unix epoch (1970-01-01 00:00:00 UTC).
To start intervals at a different point of time, e.g. 30 seconds after a minute, the `startTime` parameter can be used.

```scala
df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), "sensor_id")
  .agg(mean("measurement").as("avg_meas"))
```

This will generate windows such as:
```
09:00:30-09:05:30
09:01:30-09:06:30
09:02:30-09:07:30 ...
```

Support for Python will be made in a follow up PR after this.

## How was this patch tested?

This patch has some basic unit tests for the `TimeWindow` expression testing that the parameters pass validation, and it also has some unit/integration tests testing the correctness of the windowing and usability in complex operations (multi-column grouping, multi-column projections, joins).

Author: Burak Yavuz <brkyvz@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #12008 from brkyvz/df-time-window.
2016-04-01 13:19:24 -07:00
Dilip Biswal 0b04f8fdf1 [SPARK-14184][SQL] Support native execution of SHOW DATABASE command and fix SHOW TABLE to use table identifier pattern
## What changes were proposed in this pull request?

This PR addresses the following

1. Supports native execution of SHOW DATABASES command
2. Fixes SHOW TABLES to apply the identifier_with_wildcards pattern if supplied.

SHOW TABLE syntax
```
SHOW TABLES [IN database_name] ['identifier_with_wildcards'];
```
SHOW DATABASES syntax
```
SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];
```

## How was this patch tested?
Tests added in SQLQuerySuite (both hive and sql contexts) and DDLCommandSuite

Note: Since the table name pattern was not working , tests are added in both SQLQuerySuite to
verify the application of the table pattern.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #11991 from dilipbiswal/dkb_show_database.
2016-04-01 18:27:11 +02:00
Shixiong Zhu e785402826 [SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the java.io.tmpdir folder
## What changes were proposed in this pull request?

If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them.

## How was this patch tested?

Existing Jenkins tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12093 from zsxwing/temp-file.
2016-03-31 12:17:25 -07:00
gatorsmile 446c45bd87 [SPARK-14182][SQL] Parse DDL Command: Alter View
This PR is to provide native parsing support for DDL commands: `Alter View`. Since its AST trees are highly similar to `Alter Table`. Thus, both implementation are integrated into the same one.

Based on the Hive DDL document:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL and https://cwiki.apache.org/confluence/display/Hive/PartitionedViews

**Syntax:**
```SQL
ALTER VIEW view_name RENAME TO new_view_name
```
 - to change the name of a view to a different name

**Syntax:**
```SQL
ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment);
```
 - to add metadata to a view

**Syntax:**
```SQL
ALTER VIEW view_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')
```
 - to remove metadata from a view

**Syntax:**
```SQL
ALTER VIEW view_name ADD [IF NOT EXISTS] PARTITION spec1[, PARTITION spec2, ...]
```
 - to add the partitioning metadata for a view.
 - the syntax of partition spec in `ALTER VIEW` is identical to `ALTER TABLE`, **EXCEPT** that it is **ILLEGAL** to specify a `LOCATION` clause.

**Syntax:**
```SQL
ALTER VIEW view_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
```
 - to drop the related partition metadata for a view.

Added the related test cases to `DDLCommandSuite`

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #11987 from gatorsmile/parseAlterView.
2016-03-31 12:04:03 -07:00
Sameer Agarwal 8d6207206c [SPARK-14263][SQL] Benchmark Vectorized HashMap for GroupBy Aggregates
## What changes were proposed in this pull request?

This PR proposes a new data-structure based on a vectorized hashmap that can be potentially _codegened_ in `TungstenAggregate` to speed up aggregates with group by. Micro-benchmarks show a 10x improvement over the current `BytesToBytes` aggregation map.

## How was this patch tested?

    Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
    BytesToBytesMap:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    hash                                      108 /  119         96.9          10.3       1.0X
    fast hash                                  63 /   70        166.2           6.0       1.7X
    arrayEqual                                 70 /   73        150.8           6.6       1.6X
    Java HashMap (Long)                       141 /  200         74.3          13.5       0.8X
    Java HashMap (two ints)                   145 /  185         72.3          13.8       0.7X
    Java HashMap (UnsafeRow)                  499 /  524         21.0          47.6       0.2X
    BytesToBytesMap (off Heap)                483 /  548         21.7          46.0       0.2X
    BytesToBytesMap (on Heap)                 485 /  562         21.6          46.2       0.2X
    Vectorized Hashmap                         54 /   60        193.7           5.2       2.0X

Author: Sameer Agarwal <sameer@databricks.com>

Closes #12055 from sameeragarwal/vectorized-hashmap.
2016-03-31 11:53:13 -07:00
Herman van Hovell a9b93e0739 [SPARK-14211][SQL] Remove ANTLR3 based parser
### What changes were proposed in this pull request?

This PR removes the ANTLR3 based parser, and moves the new ANTLR4 based parser into the `org.apache.spark.sql.catalyst.parser package`.

### How was this patch tested?

Existing unit tests.

cc rxin andrewor14 yhuai

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12071 from hvanhovell/SPARK-14211.
2016-03-31 09:25:09 -07:00
Cheng Lian 26445c2e47 [SPARK-14206][SQL] buildReader() implementation for CSV
## What changes were proposed in this pull request?

Major changes:

1. Implement `FileFormat.buildReader()` for the CSV data source.
1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema.

   This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file.

## How was this patch tested?

Existing tests should do the work.

Author: Cheng Lian <lian@databricks.com>

Closes #12002 from liancheng/spark-14206-csv-build-reader.
2016-03-30 18:21:06 -07:00
Travis Crawford da54abfd87 [SPARK-14081][SQL] - Preserve DataFrame column types when filling nulls.
## What changes were proposed in this pull request?
This change resolves an issue where `DataFrameNaFunctions.fill` changes a `FloatType` column to a `DoubleType`. We also clarify the contract that replacement values will be cast to the column data type, which may change the replacement value when casting to a lower precision type.

## How was this patch tested?
This patch has associated unit tests.

Author: Travis Crawford <travis@medium.com>

Closes #11967 from traviscrawford/SPARK-14081-dataframena.
2016-03-30 16:59:52 -07:00
Takeshi YAMAMURO dadf0138b3 [SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a partition
## What changes were proposed in this pull request?
This pr is to add a config to control the maximum number of files as even small files have a non-trivial fixed cost. The current packing can put a lot of small files together which cases straggler tasks.

## How was this patch tested?
I added tests to check if many files get split into partitions in FileSourceStrategySuite.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #12068 from maropu/SPARK-14259.
2016-03-30 16:02:48 -07:00
Wenchen Fan d46c71b39d [SPARK-14268][SQL] rename toRowExpressions and fromRowExpression to serializer and deserializer in ExpressionEncoder
## What changes were proposed in this pull request?

In `ExpressionEncoder`, we use `constructorFor` to build `fromRowExpression` as the `deserializer` in `ObjectOperator`. It's kind of confusing, we should make the name consistent.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12058 from cloud-fan/rename.
2016-03-30 11:03:15 -07:00
gatorsmile b66b97cd04 [SPARK-14124][SQL] Implement Database-related DDL Commands
#### What changes were proposed in this pull request?
This PR is to implement the following four Database-related DDL commands:
 - `CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name`
 - `DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]`
 - `DESCRIBE DATABASE [EXTENDED] db_name`
 - `ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)`

Another PR will be submitted to handle the unsupported commands. In the Database-related DDL commands, we will issue an error exception for `ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role`.

cc yhuai andrewor14 rxin Could you review the changes? Is it in the right direction? Thanks!

#### How was this patch tested?
Added a few test cases in `command/DDLSuite.scala` for testing DDL command execution in `SQLContext`. Since `HiveContext` also shares the same implementation, the existing test cases in `\hive` also verifies the correctness of these commands.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12009 from gatorsmile/dbDDL.
2016-03-29 17:39:52 -07:00
Eric Liang e58c4cb3c5 [SPARK-14227][SQL] Add method for printing out generated code for debugging
## What changes were proposed in this pull request?

This adds `debugCodegen` to the debug package for query execution.

## How was this patch tested?

Unit and manual testing. Output example:

```
scala> import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.debug._

scala> sqlContext.range(100).groupBy("id").count().orderBy("id").debugCodegen()
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 ==
WholeStageCodegen
:  +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L])
:     +- Range 0, 1, 1, 100, [id#0L]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L])
/* 007 */ +- Range 0, 1, 1, 100, [id#0L]
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean agg_initAgg;
/* 012 */   private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 015 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */   private org.apache.spark.sql.execution.metric.LongSQLMetric range_numOutputRows;
/* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue range_metricValue;
/* 018 */   private boolean range_initRange;
/* 019 */   private long range_partitionEnd;
/* 020 */   private long range_number;
/* 021 */   private boolean range_overflow;
/* 022 */   private scala.collection.Iterator range_input;
/* 023 */   private UnsafeRow range_result;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 026 */   private UnsafeRow agg_result;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 028 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 029 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner;
/* 030 */   private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows;
/* 031 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue;
/* 032 */
/* 033 */   public GeneratedIterator(Object[] references) {
/* 034 */     this.references = references;
/* 035 */   }
/* 036 */
/* 037 */   public void init(scala.collection.Iterator inputs[]) {
/* 038 */     agg_initAgg = false;
/* 039 */     this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
/* 040 */     agg_hashMap = agg_plan.createHashMap();
/* 041 */
/* 042 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 043 */     range_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) range_numOutputRows.localValue();
/* 044 */     range_initRange = false;
/* 045 */     range_partitionEnd = 0L;
/* 046 */     range_number = 0L;
/* 047 */     range_overflow = false;
/* 048 */     range_input = inputs[0];
/* 049 */     range_result = new UnsafeRow(1);
/* 050 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 051 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 052 */     agg_result = new UnsafeRow(1);
/* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 055 */     agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 056 */     this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 057 */     wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue();
/* 058 */   }
/* 059 */
/* 060 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 061 */     /*** PRODUCE: Range 0, 1, 1, 100, [id#0L] */
/* 062 */
/* 063 */     // initialize Range
/* 064 */     if (!range_initRange) {
/* 065 */       range_initRange = true;
/* 066 */       if (range_input.hasNext()) {
/* 067 */         initRange(((InternalRow) range_input.next()).getInt(0));
/* 068 */       } else {
/* 069 */         return;
/* 070 */       }
/* 071 */     }
/* 072 */
/* 073 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 074 */       long range_value = range_number;
/* 075 */       range_number += 1L;
/* 076 */       if (range_number < range_value ^ 1L < 0) {
/* 077 */         range_overflow = true;
/* 078 */       }
/* 079 */
/* 080 */       /*** CONSUME: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) */
/* 081 */
/* 082 */       // generate grouping key
/* 083 */       agg_rowWriter.write(0, range_value);
/* 084 */       /* hash(input[0, bigint], 42) */
/* 085 */       int agg_value1 = 42;
/* 086 */
/* 087 */       agg_value1 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(range_value, agg_value1);
/* 088 */       UnsafeRow agg_aggBuffer = null;
/* 089 */       if (true) {
/* 090 */         // try to get the buffer from hash map
/* 091 */         agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1);
/* 092 */       }
/* 093 */       if (agg_aggBuffer == null) {
/* 094 */         if (agg_sorter == null) {
/* 095 */           agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 096 */         } else {
/* 097 */           agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 098 */         }
/* 099 */
/* 100 */         // the hash map had be spilled, it should have enough memory now,
/* 101 */         // try  to allocate buffer again.
/* 102 */         agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1);
/* 103 */         if (agg_aggBuffer == null) {
/* 104 */           // failed to allocate the first page
/* 105 */           throw new OutOfMemoryError("No enough memory for aggregation");
/* 106 */         }
/* 107 */       }
/* 108 */
/* 109 */       // evaluate aggregate function
/* 110 */       /* (input[0, bigint] + 1) */
/* 111 */       /* input[0, bigint] */
/* 112 */       long agg_value4 = agg_aggBuffer.getLong(0);
/* 113 */
/* 114 */       long agg_value3 = -1L;
/* 115 */       agg_value3 = agg_value4 + 1L;
/* 116 */       // update aggregate buffer
/* 117 */       agg_aggBuffer.setLong(0, agg_value3);
/* 118 */
/* 119 */       if (shouldStop()) return;
/* 120 */     }
/* 121 */
/* 122 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter);
/* 123 */   }
/* 124 */
/* 125 */   private void initRange(int idx) {
/* 126 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 127 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 128 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(100L);
/* 129 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 130 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 131 */
/* 132 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 133 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 134 */       range_number = Long.MAX_VALUE;
/* 135 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 136 */       range_number = Long.MIN_VALUE;
/* 137 */     } else {
/* 138 */       range_number = st.longValue();
/* 139 */     }
/* 140 */
/* 141 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 142 */     .multiply(step).add(start);
/* 143 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 144 */       range_partitionEnd = Long.MAX_VALUE;
/* 145 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 146 */       range_partitionEnd = Long.MIN_VALUE;
/* 147 */     } else {
/* 148 */       range_partitionEnd = end.longValue();
/* 149 */     }
/* 150 */
/* 151 */     range_metricValue.add((range_partitionEnd - range_number) / 1L);
/* 152 */   }
/* 153 */
/* 154 */   protected void processNext() throws java.io.IOException {
/* 155 */     /*** PRODUCE: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) */
/* 156 */
/* 157 */     if (!agg_initAgg) {
/* 158 */       agg_initAgg = true;
/* 159 */       agg_doAggregateWithKeys();
/* 160 */     }
/* 161 */
/* 162 */     // output the result
/* 163 */     while (agg_mapIter.next()) {
/* 164 */       wholestagecodegen_metricValue.add(1);
/* 165 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 166 */       UnsafeRow agg_aggBuffer1 = (UnsafeRow) agg_mapIter.getValue();
/* 167 */
/* 168 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer1);
/* 169 */
/* 170 */       /*** CONSUME: WholeStageCodegen */
/* 171 */
/* 172 */       append(agg_resultRow);
/* 173 */
/* 174 */       if (shouldStop()) return;
/* 175 */     }
/* 176 */
/* 177 */     agg_mapIter.close();
/* 178 */     if (agg_sorter == null) {
/* 179 */       agg_hashMap.free();
/* 180 */     }
/* 181 */   }
/* 182 */ }

== Subtree 2 / 3 ==
WholeStageCodegen
:  +- Sort [id#0L ASC], true, 0
:     +- INPUT
+- Exchange rangepartitioning(id#0L ASC, 200), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L])
      :     +- INPUT
      +- Exchange hashpartitioning(id#0L, 200), None
         +- WholeStageCodegen
            :  +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L])
            :     +- Range 0, 1, 1, 100, [id#0L]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#0L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */
/* 022 */   public GeneratedIterator(Object[] references) {
/* 023 */     this.references = references;
/* 024 */   }
/* 025 */
/* 026 */   public void init(scala.collection.Iterator inputs[]) {
/* 027 */     sort_needToSort = true;
/* 028 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 029 */     sort_sorter = sort_plan.createSorter();
/* 030 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 031 */
/* 032 */     inputadapter_input = inputs[0];
/* 033 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 034 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
/* 035 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 036 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
/* 037 */   }
/* 038 */
/* 039 */   private void sort_addToSorter() throws java.io.IOException {
/* 040 */     /*** PRODUCE: INPUT */
/* 041 */
/* 042 */     while (inputadapter_input.hasNext()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       /*** CONSUME: Sort [id#0L ASC], true, 0 */
/* 045 */
/* 046 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 047 */       if (shouldStop()) return;
/* 048 */     }
/* 049 */
/* 050 */   }
/* 051 */
/* 052 */   protected void processNext() throws java.io.IOException {
/* 053 */     /*** PRODUCE: Sort [id#0L ASC], true, 0 */
/* 054 */     if (sort_needToSort) {
/* 055 */       sort_addToSorter();
/* 056 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 057 */       sort_sortedIter = sort_sorter.sort();
/* 058 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 059 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 060 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 061 */       sort_needToSort = false;
/* 062 */     }
/* 063 */
/* 064 */     while (sort_sortedIter.hasNext()) {
/* 065 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 066 */
/* 067 */       /*** CONSUME: WholeStageCodegen */
/* 068 */
/* 069 */       append(sort_outputRow);
/* 070 */
/* 071 */       if (shouldStop()) return;
/* 072 */     }
/* 073 */   }
/* 074 */ }

== Subtree 3 / 3 ==
WholeStageCodegen
:  +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L])
:     +- INPUT
+- Exchange hashpartitioning(id#0L, 200), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L])
      :     +- Range 0, 1, 1, 100, [id#0L]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L])
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean agg_initAgg;
/* 012 */   private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 015 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private UnsafeRow agg_result;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 020 */   private UnsafeRow agg_result1;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter1;
/* 023 */   private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows;
/* 024 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue;
/* 025 */
/* 026 */   public GeneratedIterator(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(scala.collection.Iterator inputs[]) {
/* 031 */     agg_initAgg = false;
/* 032 */     this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
/* 033 */     agg_hashMap = agg_plan.createHashMap();
/* 034 */
/* 035 */     inputadapter_input = inputs[0];
/* 036 */     agg_result = new UnsafeRow(1);
/* 037 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 038 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 039 */     agg_result1 = new UnsafeRow(2);
/* 040 */     this.agg_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 0);
/* 041 */     this.agg_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 2);
/* 042 */     this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 043 */     wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue();
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 047 */     /*** PRODUCE: INPUT */
/* 048 */
/* 049 */     while (inputadapter_input.hasNext()) {
/* 050 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 051 */       /*** CONSUME: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) */
/* 052 */       /* input[0, bigint] */
/* 053 */       long inputadapter_value = inputadapter_row.getLong(0);
/* 054 */       /* input[1, bigint] */
/* 055 */       long inputadapter_value1 = inputadapter_row.getLong(1);
/* 056 */
/* 057 */       // generate grouping key
/* 058 */       agg_rowWriter.write(0, inputadapter_value);
/* 059 */       /* hash(input[0, bigint], 42) */
/* 060 */       int agg_value1 = 42;
/* 061 */
/* 062 */       agg_value1 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value, agg_value1);
/* 063 */       UnsafeRow agg_aggBuffer = null;
/* 064 */       if (true) {
/* 065 */         // try to get the buffer from hash map
/* 066 */         agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1);
/* 067 */       }
/* 068 */       if (agg_aggBuffer == null) {
/* 069 */         if (agg_sorter == null) {
/* 070 */           agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 071 */         } else {
/* 072 */           agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 073 */         }
/* 074 */
/* 075 */         // the hash map had be spilled, it should have enough memory now,
/* 076 */         // try  to allocate buffer again.
/* 077 */         agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1);
/* 078 */         if (agg_aggBuffer == null) {
/* 079 */           // failed to allocate the first page
/* 080 */           throw new OutOfMemoryError("No enough memory for aggregation");
/* 081 */         }
/* 082 */       }
/* 083 */
/* 084 */       // evaluate aggregate function
/* 085 */       /* (input[0, bigint] + input[2, bigint]) */
/* 086 */       /* input[0, bigint] */
/* 087 */       long agg_value4 = agg_aggBuffer.getLong(0);
/* 088 */
/* 089 */       long agg_value3 = -1L;
/* 090 */       agg_value3 = agg_value4 + inputadapter_value1;
/* 091 */       // update aggregate buffer
/* 092 */       agg_aggBuffer.setLong(0, agg_value3);
/* 093 */       if (shouldStop()) return;
/* 094 */     }
/* 095 */
/* 096 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter);
/* 097 */   }
/* 098 */
/* 099 */   protected void processNext() throws java.io.IOException {
/* 100 */     /*** PRODUCE: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) */
/* 101 */
/* 102 */     if (!agg_initAgg) {
/* 103 */       agg_initAgg = true;
/* 104 */       agg_doAggregateWithKeys();
/* 105 */     }
/* 106 */
/* 107 */     // output the result
/* 108 */     while (agg_mapIter.next()) {
/* 109 */       wholestagecodegen_metricValue.add(1);
/* 110 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 111 */       UnsafeRow agg_aggBuffer1 = (UnsafeRow) agg_mapIter.getValue();
/* 112 */
/* 113 */       /* input[0, bigint] */
/* 114 */       long agg_value6 = agg_aggKey.getLong(0);
/* 115 */       /* input[0, bigint] */
/* 116 */       long agg_value7 = agg_aggBuffer1.getLong(0);
/* 117 */
/* 118 */       /*** CONSUME: WholeStageCodegen */
/* 119 */
/* 120 */       agg_rowWriter1.write(0, agg_value6);
/* 121 */
/* 122 */       agg_rowWriter1.write(1, agg_value7);
/* 123 */       append(agg_result1);
/* 124 */
/* 125 */       if (shouldStop()) return;
/* 126 */     }
/* 127 */
/* 128 */     agg_mapIter.close();
/* 129 */     if (agg_sorter == null) {
/* 130 */       agg_hashMap.free();
/* 131 */     }
/* 132 */   }
/* 133 */ }
```

rxin

Author: Eric Liang <ekl@databricks.com>

Closes #12025 from ericl/spark-14227.
2016-03-29 13:31:51 -07:00
Wenchen Fan 38326cad87 [SPARK-14205][SQL] remove trait Queryable
## What changes were proposed in this pull request?

After DataFrame and Dataset are merged, the trait `Queryable` becomes unnecessary as it has only one implementation. We should remove it.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12001 from cloud-fan/df-ds.
2016-03-28 18:53:47 -07:00
Herman van Hovell 328c71161b [SPARK-14086][SQL] Add DDL commands to ANTLR4 parser
#### What changes were proposed in this pull request?

This PR adds all the current Spark SQL DDL commands to the new ANTLR 4 based SQL parser.

I have found a few inconsistencies in the current commands:
- Function has an alias field. This is actually the class name of the function.
- Partition specifications should contain nulls in some commands, and contain `None`s in others.
- `AlterTableSkewedLocation`: Should defines which columns have skewed values, and should allow us to define storage for each skewed combination of values. We currently only allow one value per field.
- `AlterTableSetFileFormat`: Should only have one file format, it currently supports both.

I have implemented all these comments like they were, and I propose to improve them in follow-up PRs.

#### How was this patch tested?

The existing DDLCommandSuite.

cc rxin andrewor14 yhuai

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12011 from hvanhovell/SPARK-14086.
2016-03-28 16:22:02 -07:00
Davies Liu d7b58f1461 [SPARK-14052] [SQL] build a BytesToBytesMap directly in HashedRelation
## What changes were proposed in this pull request?

Currently, for the key that can not fit within a long,  we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency.

In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K,  Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key.

## How was this patch tested?

Existing tests. Added benchmark for broadcast hash join with duplicated keys.

Author: Davies Liu <davies@databricks.com>

Closes #11870 from davies/map2.
2016-03-28 13:07:32 -07:00
Herman van Hovell 600c0b69ca [SPARK-13713][SQL] Migrate parser from ANTLR3 to ANTLR4
### What changes were proposed in this pull request?
The current ANTLR3 parser is quite complex to maintain and suffers from code blow-ups. This PR introduces a new parser that is based on ANTLR4.

This parser is based on the [Presto's SQL parser](https://github.com/facebook/presto/blob/master/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4). The current implementation can parse and create Catalyst and SQL plans. Large parts of the HiveQl DDL and some of the DML functionality is currently missing, the plan is to add this in follow-up PRs.

This PR is a work in progress, and work needs to be done in the following area's:

- [x] Error handling should be improved.
- [x] Documentation should be improved.
- [x] Multi-Insert needs to be tested.
- [ ] Naming and package locations.

### How was this patch tested?

Catalyst and SQL unit tests.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #11557 from hvanhovell/ngParser.
2016-03-28 12:31:12 -07:00
gatorsmile a01b6a92b5 [SPARK-14177][SQL] Native Parsing for DDL Command "Describe Database" and "Alter Database"
#### What changes were proposed in this pull request?

This PR is to provide native parsing support for two DDL commands:  ```Describe Database``` and ```Alter Database Set Properties```

Based on the Hive DDL document:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

##### 1. ALTER DATABASE
**Syntax:**
```SQL
ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
```
 - `ALTER DATABASE` is to add new (key, value) pairs into `DBPROPERTIES`

##### 2. DESCRIBE DATABASE
**Syntax:**
```SQL
DESCRIBE DATABASE [EXTENDED] db_name
```
 - `DESCRIBE DATABASE` shows the name of the database, its comment (if one has been set), and its root location on the filesystem. When `extended` is true, it also shows the database's properties

#### How was this patch tested?
Added the related test cases to `DDLCommandSuite`

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

This patch had conflicts when merged, resolved by
Committer: Yin Huai <yhuai@databricks.com>

Closes #11977 from gatorsmile/parseAlterDatabase.
2016-03-26 20:12:30 -07:00
Liang-Chi Hsieh bc925b73a6 [SPARK-14157][SQL] Parse Drop Function DDL command
## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-14157

We only parse create function command. In order to support native drop function command, we need to parse it too.

From Hive [manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/ReloadFunction), the drop function command has syntax as:

DROP [TEMPORARY] FUNCTION [IF EXISTS] function_name;

## How was this patch tested?

Added test into `DDLCommandSuite`.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11959 from viirya/parse-drop-func.
2016-03-26 20:09:01 -07:00
gatorsmile 8989d3a396 [SPARK-14161][SQL] Native Parsing for DDL Command Drop Database
### What changes were proposed in this pull request?
Based on the Hive DDL document https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

The syntax of DDL command for Drop Database is
```SQL
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];
```
 - If `IF EXISTS` is not specified, the default behavior is to issue a warning message if `database_name` does't exist
 - `RESTRICT` is the default behavior.

This PR is to provide a native parsing support for `DROP DATABASE`.

#### How was this patch tested?

Added a test case `DDLCommandSuite`

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11962 from gatorsmile/parseDropDatabase.
2016-03-26 14:11:13 -07:00