Commit graph

3769 commits

Author SHA1 Message Date
wangyang 026eb90644 [SPARK-15875] Try to use Seq.isEmpty and Seq.nonEmpty instead of Seq.length == 0 and Seq.length > 0
## What changes were proposed in this pull request?

In scala, immutable.List.length is an expensive operation so we should
avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and Seq.nonEmpty instead.

## How was this patch tested?
existing tests

Author: wangyang <wangyang@haizhi.com>

Closes #13601 from yangw1234/isEmpty.
2016-06-10 13:10:03 -07:00
Sandeep Singh 865ec32dd9 [MINOR][X][X] Replace all occurrences of None: Option with Option.empty
## What changes were proposed in this pull request?
Replace all occurrences of `None: Option[X]` with `Option.empty[X]`

## How was this patch tested?
Exisiting Tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #13591 from techaddict/minor-7.
2016-06-10 13:06:51 -07:00
Takuya UESHIN 667d4ea7b3 [SPARK-6320][SQL] Move planLater method into GenericStrategy.
## What changes were proposed in this pull request?

This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #13147 from ueshin/issues/SPARK-6320.
2016-06-10 13:06:18 -07:00
Liwei Lin fb219029dd [SPARK-15871][SQL] Add assertNotPartitioned check in DataFrameWriter
## What changes were proposed in this pull request?

It doesn't make sense to specify partitioning parameters, when we write data out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s.

This patch adds `assertNotPartitioned` check in `DataFrameWriter`.

<table>
<tr>
	<td align="center"><strong>operation</strong></td>
	<td align="center"><strong>should check not partitioned?</strong></td>
</tr>
<tr>
	<td align="center">mode</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">outputMode</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">trigger</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">format</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">option/options</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">partitionBy</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">bucketBy</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">sortBy</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">save</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">queryName</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">startStream</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">foreach</td>
	<td align="center">yes</td>
</tr>
<tr>
	<td align="center">insertInto</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">saveAsTable</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">jdbc</td>
	<td align="center">yes</td>
</tr>
<tr>
	<td align="center">json</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">parquet</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">orc</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">text</td>
	<td align="center"></td>
</tr>
<tr>
	<td align="center">csv</td>
	<td align="center"></td>
</tr>
</table>

## How was this patch tested?

New dedicated tests.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #13597 from lw-lin/add-assertNotPartitioned.
2016-06-10 13:01:29 -07:00
Dongjoon Hyun 2413fce9d6 [SPARK-15743][SQL] Prevent saving with all-column partitioning
## What changes were proposed in this pull request?

When saving datasets on storage, `partitionBy` provides an easy way to construct the directory structure. However, if a user choose all columns as partition columns, some exceptions occurs.

- **ORC with all column partitioning**: `AnalysisException` on **future read** due to schema inference failure.
 ```scala
scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

scala> spark.read.format("orc").load("/tmp/data").collect()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must be specified manually;
```

- **Parquet with all-column partitioning**: `InvalidSchemaException` on **write execution** due to Parquet limitation.
 ```scala
scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
[Stage 0:>                                                          (0 + 8) / 8]16/06/02 16:51:17
ERROR Utils: Aborting task
org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
... (lots of error messages)
```

Although some formats like JSON support all-column partitioning without any problem, it seems not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising `AnalysisException` before executing save operation.

## How was this patch tested?

Newly added `PartitioningUtilsSuite`.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13486 from dongjoon-hyun/SPARK-15743.
2016-06-10 12:43:27 -07:00
Reynold Xin 254bc8c34e [SPARK-15866] Rename listAccumulator collectionAccumulator
## What changes were proposed in this pull request?
SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" is a verb and the method should return a list of accumulators. This patch renames the method and the class collection accumulator.

## How was this patch tested?
Updated test case to reflect the names.

Author: Reynold Xin <rxin@databricks.com>

Closes #13594 from rxin/SPARK-15866.
2016-06-10 11:08:39 -07:00
Liang-Chi Hsieh 0ec279ffdf [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter
## What changes were proposed in this pull request?

This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #13496 from viirya/move-analyzer-stuff.
2016-06-10 11:05:04 -07:00
Tathagata Das abdb5d42c5 [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode
## What changes were proposed in this pull request?

When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later.

## How was this patch tested?
Additional unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13549 from tdas/SPARK-15812.
2016-06-10 10:48:28 -07:00
Shixiong Zhu 00c310133d [SPARK-15593][SQL] Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
## What changes were proposed in this pull request?

* Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
  * ForeachWriter is the interface for the user to consume partitions of data
* Add a type parameter T to DataFrameWriter

Usage
```Scala
val ds = spark.read....stream().as[String]
ds.....write
         .queryName(...)
        .option("checkpointLocation", ...)
        .foreach(new ForeachWriter[Int] {
          def open(partitionId: Long, version: Long): Boolean = {
             // prepare some resources for a partition
             // check `version` if possible and return `false` if this is a duplicated data to skip the data processing.
          }

          override def process(value: Int): Unit = {
              // process data
          }

          def close(errorOrNull: Throwable): Unit = {
             // release resources for a partition
             // check `errorOrNull` and handle the error if necessary.
          }
        })
```

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13342 from zsxwing/foreach.
2016-06-10 00:11:46 -07:00
Dongjoon Hyun 5a3533e779 [SPARK-15696][SQL] Improve crosstab to have a consistent column order
## What changes were proposed in this pull request?

Currently, `crosstab` returns a Dataframe having **random-order** columns obtained by just `distinct`. Also, the documentation of `crosstab` shows the result in a sorted order which is different from the current implementation. This PR explicitly constructs the columns in a sorted order in order to improve user experience. Also, this implementation gives the same result with the documentation.

**Before**
```scala
scala> spark.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), (3, 3))).toDF("key", "value").stat.crosstab("key", "value").show()
+---------+---+---+---+
|key_value|  3|  2|  1|
+---------+---+---+---+
|        2|  1|  0|  2|
|        1|  0|  1|  1|
|        3|  1|  1|  0|
+---------+---+---+---+

scala> spark.createDataFrame(Seq((1, "a"), (1, "b"), (2, "a"), (2, "a"), (2, "c"), (3, "b"), (3, "c"))).toDF("key", "value").stat.crosstab("key", "value").show()
+---------+---+---+---+
|key_value|  c|  a|  b|
+---------+---+---+---+
|        2|  1|  2|  0|
|        1|  0|  1|  1|
|        3|  1|  0|  1|
+---------+---+---+---+
```

**After**
```scala
scala> spark.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), (3, 3))).toDF("key", "value").stat.crosstab("key", "value").show()
+---------+---+---+---+
|key_value|  1|  2|  3|
+---------+---+---+---+
|        2|  2|  0|  1|
|        1|  1|  1|  0|
|        3|  0|  1|  1|
+---------+---+---+---+
scala> spark.createDataFrame(Seq((1, "a"), (1, "b"), (2, "a"), (2, "a"), (2, "c"), (3, "b"), (3, "c"))).toDF("key", "value").stat.crosstab("key", "value").show()
+---------+---+---+---+
|key_value|  a|  b|  c|
+---------+---+---+---+
|        2|  2|  0|  1|
|        1|  1|  1|  0|
|        3|  0|  1|  1|
+---------+---+---+---+
```

## How was this patch tested?

Pass the Jenkins tests with updated testcases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13436 from dongjoon-hyun/SPARK-15696.
2016-06-09 22:46:51 -07:00
Eric Liang 6c5fd977fb [SPARK-15791] Fix NPE in ScalarSubquery
## What changes were proposed in this pull request?

The fix is pretty simple, just don't make the executedPlan transient in `ScalarSubquery` since it is referenced at execution time.

## How was this patch tested?

I verified the fix manually in non-local mode. It's not clear to me why the problem did not manifest in local mode, any suggestions?

cc davies

Author: Eric Liang <ekl@databricks.com>

Closes #13569 from ericl/fix-scalar-npe.
2016-06-09 22:28:31 -07:00
Reynold Xin 16df133d7f [SPARK-15850][SQL] Remove function grouping in SparkSession
## What changes were proposed in this pull request?
SparkSession does not have that many functions due to better namespacing, and as a result we probably don't need the function grouping. This patch removes the grouping and also adds missing scaladocs for createDataset functions in SQLContext.

Closes #13577.

## How was this patch tested?
N/A - this is a documentation change.

Author: Reynold Xin <rxin@databricks.com>

Closes #13582 from rxin/SPARK-15850.
2016-06-09 18:58:24 -07:00
Shixiong Zhu 4d9d9cc585 [SPARK-15853][SQL] HDFSMetadataLog.get should close the input stream
## What changes were proposed in this pull request?

This PR closes the input stream created in `HDFSMetadataLog.get`

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13583 from zsxwing/leak.
2016-06-09 18:45:19 -07:00
Eric Liang b914e1930f [SPARK-15794] Should truncate toString() of very wide plans
## What changes were proposed in this pull request?

With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact.

It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability.

## How was this patch tested?

Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold.

```
numFields = 5
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)            2336 / 2558          0.0       23364.4       0.1X

numFields = 25
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)            4237 / 4465          0.0       42367.9       0.1X

numFields = 100
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)          10458 / 11223          0.0      104582.0       0.0X

numFields = Infinity
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
[info]   java.lang.OutOfMemoryError: Java heap space
```

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #13537 from ericl/truncated-string.
2016-06-09 18:05:16 -07:00
Herman van Hovell b0768538e5 [SPARK-14321][SQL] Reduce date format cost and string-to-date cost in date functions
## What changes were proposed in this pull request?
The current implementations of `UnixTime` and `FromUnixTime` do not cache their parser/formatter as much as they could. This PR resolved this issue.

This PR is a take over from https://github.com/apache/spark/pull/13522 and further optimizes the re-use of the parser/formatter. It also fixes the improves handling (catching the actual exception instead of `Throwable`). All credits for this work should go to rajeshbalamohan.

This PR closes https://github.com/apache/spark/pull/13522

## How was this patch tested?
Current tests.

Author: Herman van Hovell <hvanhovell@databricks.com>
Author: Rajesh Balamohan <rbalamohan@apache.org>

Closes #13581 from hvanhovell/SPARK-14321.
2016-06-09 16:37:18 -07:00
Kevin Yu 99386fe398 [SPARK-15804][SQL] Include metadata in the toStructType
## What changes were proposed in this pull request?
The help function 'toStructType' in the AttributeSeq class doesn't include the metadata when it builds the StructField, so it causes this reported problem https://issues.apache.org/jira/browse/SPARK-15804?jql=project%20%3D%20SPARK when spark writes the the dataframe with the metadata to the parquet datasource.

The code path is when spark writes the dataframe to the parquet datasource through the InsertIntoHadoopFsRelationCommand, spark will build the WriteRelation container, and it will call the help function 'toStructType' to create StructType which contains StructField, it should include the metadata there, otherwise, we will lost the user provide metadata.

## How was this patch tested?

added test case in ParquetQuerySuite.scala

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: Kevin Yu <qyu@us.ibm.com>

Closes #13555 from kevinyu98/spark-15804.
2016-06-09 09:50:09 -07:00
Sandeep Singh d5807def10 [MINOR][DOC] In Dataset docs, remove self link to Dataset and add link to Column
## What changes were proposed in this pull request?
Documentation Fix

## How was this patch tested?

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #13567 from techaddict/minor-4.
2016-06-08 23:41:29 -07:00
Wenchen Fan afbe35cf5b [SPARK-14670] [SQL] allow updating driver side sql metrics
## What changes were proposed in this pull request?

On the SparkUI right now we have this SQLTab that displays accumulator values per operator. However, it only displays metrics updated on the executors, not on the driver. It is useful to also include driver metrics, e.g. broadcast time.

This is a different version from https://github.com/apache/spark/pull/12427. This PR sends driver side accumulator updates right after the updating happens, not at the end of execution, by a new event.

## How was this patch tested?

new test in `SQLListenerSuite`

![qq20160606-0](https://cloud.githubusercontent.com/assets/3182036/15841418/0eb137da-2c06-11e6-9068-5694eeb78530.png)

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13189 from cloud-fan/metrics.
2016-06-08 22:47:29 -07:00
Sandeep Singh f958c1c3e2 [MINOR] Fix Java Lint errors introduced by #13286 and #13280
## What changes were proposed in this pull request?

revived #13464

Fix Java Lint errors introduced by #13286 and #13280
Before:
```
Using `mvn` from path: /Users/pichu/Project/spark/build/apache-maven-3.3.9/bin/mvn
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0
Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[340,5] (whitespace) FileTabCharacter: Line contains a tab character.
[ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[341,5] (whitespace) FileTabCharacter: Line contains a tab character.
[ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[342,5] (whitespace) FileTabCharacter: Line contains a tab character.
[ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[343,5] (whitespace) FileTabCharacter: Line contains a tab character.
[ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[41,28] (naming) MethodName: Method name 'Append' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[52,28] (naming) MethodName: Method name 'Complete' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[61,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.PrimitiveType.
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[62,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.Type.
```

## How was this patch tested?
ran `dev/lint-java` locally

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #13559 from techaddict/minor-3.
2016-06-08 14:51:00 +01:00
Herman van Hovell 91fbc880b6 [SPARK-15789][SQL] Allow reserved keywords in most places
## What changes were proposed in this pull request?
The parser currently does not allow the use of some SQL keywords as table or field names. This PR adds supports for all keywords as identifier. The exception to this are table aliases, in this case most keywords are allowed except for join keywords (```anti, full, inner, left, semi, right, natural, on, join, cross```) and set-operator keywords (```union, intersect, except```).

## How was this patch tested?
I have added/move/renamed test in the catalyst `*ParserSuite`s.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #13534 from hvanhovell/SPARK-15789.
2016-06-07 17:01:11 -07:00
Shixiong Zhu 0cfd6192f3 [SPARK-15580][SQL] Add ContinuousQueryInfo to make ContinuousQueryListener events serializable
## What changes were proposed in this pull request?

This PR adds ContinuousQueryInfo to make ContinuousQueryListener events serializable in order to support writing events into the event log.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13335 from zsxwing/query-info.
2016-06-07 16:40:03 -07:00
Sean Zhong 890baaca50 [SPARK-15674][SQL] Deprecates "CREATE TEMPORARY TABLE USING...", uses "CREAT TEMPORARY VIEW USING..." instead
## What changes were proposed in this pull request?

The current implementation of "CREATE TEMPORARY TABLE USING datasource..." is NOT creating any intermediate temporary data directory like temporary HDFS folder, instead, it only stores a SQL string in memory. Probably we should use "TEMPORARY VIEW" instead.

This PR assumes a temporary table has to link with some temporary intermediate data. It follows the definition of temporary table like this (from [hortonworks doc](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_dataintegration/content/temp-tables.html)):
> A temporary table is a convenient way for an application to automatically manage intermediate data generated during a complex query

**Example**:

```
scala> spark.sql("CREATE temporary view  my_tab7 (c1: String, c2: String)  USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
scala> spark.sql("select c1, c2 from my_tab7").show()
+----+-----+
|  c1|   c2|
+----+-----+
|year| make|
|2012|Tesla|
...
```

It NOW prints a **deprecation warning** if "CREATE TEMPORARY TABLE USING..." is used.

```
scala> spark.sql("CREATE temporary table  my_tab7 (c1: String, c2: String)  USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
16/05/31 10:39:27 WARN SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE tableName USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13414 from clockfly/create_temp_view_using.
2016-06-07 15:21:55 -07:00
Sean Zhong 5f731d6859 [SPARK-15792][SQL] Allows operator to change the verbosity in explain output
## What changes were proposed in this pull request?

This PR allows customization of verbosity in explain output. After change, `dataframe.explain()` and `dataframe.explain(true)` has different verbosity output for physical plan.

Currently, this PR only enables verbosity string for operator `HashAggregateExec` and `SortAggregateExec`. We will gradually enable verbosity string for more operators in future.

**Less verbose mode:** dataframe.explain(extended = false)

`output=[count(a)#85L]` is **NOT** displayed for HashAggregate.

```
scala> Seq((1,2,3)).toDF("a", "b", "c").createTempView("df2")
scala> spark.sql("select count(a) from df2").explain()
== Physical Plan ==
*HashAggregate(key=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(key=[], functions=[partial_count(1)])
      +- LocalTableScan
```

**Verbose mode:** dataframe.explain(extended = true)

`output=[count(a)#85L]` is displayed for HashAggregate.

```
scala> spark.sql("select count(a) from df2").explain(true)  // "output=[count(a)#85L]" is added
...
== Physical Plan ==
*HashAggregate(key=[], functions=[count(1)], output=[count(a)#85L])
+- Exchange SinglePartition
   +- *HashAggregate(key=[], functions=[partial_count(1)], output=[count#87L])
      +- LocalTableScan
```

## How was this patch tested?

Manual test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13535 from clockfly/verbose_breakdown_2.
2016-06-06 22:59:25 -07:00
Sean Zhong 0e0904a2fc [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema
## What changes were proposed in this pull request?

This PR makes sure the typed Filter doesn't change the Dataset schema.

**Before the change:**

```
scala> val df = spark.range(0,9)
scala> df.schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false))
scala> val afterFilter = df.filter(_=>true)
scala> afterFilter.schema   // !!! schema is CHANGED!!! Column name is changed from id to value, nullable is changed from false to true.
res13: org.apache.spark.sql.types.StructType = StructType(StructField(value,LongType,true))

```

SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, and these two can possibly change the schema of Dataset.

**After the change:**

```
scala> afterFilter.schema   // schema is NOT changed.
res47: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false))
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13529 from clockfly/spark-15632.
2016-06-06 22:40:21 -07:00
Josh Rosen 0b8d694999 [SPARK-15764][SQL] Replace N^2 loop in BindReferences
BindReferences contains a n^2 loop which causes performance issues when operating over large schemas: to determine the ordinal of an attribute reference, we perform a linear scan over the `input` array. Because input can sometimes be a `List`, the call to `input(ordinal).nullable` can also be O(n).

Instead of performing a linear scan, we can convert the input into an array and build a hash map to map from expression ids to ordinals. The greater up-front cost of the map construction is offset by the fact that an expression can contain multiple attribute references, so the cost of the map construction is amortized across a number of lookups.

Perf. benchmarks to follow. /cc ericl

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13505 from JoshRosen/bind-references-improvement.
2016-06-06 11:44:51 -07:00
Zheng RuiFeng fd8af39713 [MINOR] Fix Typos 'an -> a'
## What changes were proposed in this pull request?

`an -> a`

Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one.

## How was this patch tested?
manual tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #13515 from zhengruifeng/an_a.
2016-06-06 09:35:47 +01:00
Reynold Xin 32f2f95dbd Revert "[SPARK-15585][SQL] Fix NULL handling along with a spark-csv behaivour"
This reverts commit b7e8d1cb3c.
2016-06-05 23:40:13 -07:00
Takeshi YAMAMURO b7e8d1cb3c [SPARK-15585][SQL] Fix NULL handling along with a spark-csv behaivour
## What changes were proposed in this pull request?
This pr fixes the behaviour of `format("csv").option("quote", null)` along with one of spark-csv.
Also, it explicitly sets default values for CSV options in python.

## How was this patch tested?
Added tests in CSVSuite.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #13372 from maropu/SPARK-15585.
2016-06-05 23:35:04 -07:00
Hiroshi Inoue 79268aa461 [SPARK-15704][SQL] add a test case in DatasetAggregatorSuite for regression testing
## What changes were proposed in this pull request?

This change fixes a crash in TungstenAggregate while executing "Dataset complex Aggregator" test case due to IndexOutOfBoundsException.

jira entry for detail: https://issues.apache.org/jira/browse/SPARK-15704

## How was this patch tested?
Using existing unit tests (including DatasetBenchmark)

Author: Hiroshi Inoue <inouehrs@jp.ibm.com>

Closes #13446 from inouehrs/fix_aggregate.
2016-06-05 20:10:33 -07:00
Josh Rosen 26c1089c37 [SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in PartitionStatistics
`PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten an iterator of lists, but this is extremely inefficient compared to simply doing `flatMap`/`flatten` because it performs many unnecessary object allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent performance gains when constructing PartitionStatistics instances for tables with many columns.

This patch fixes this and also makes two similar changes in MLlib and streaming to try to fix all known occurrences of this pattern.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13491 from JoshRosen/foldleft-to-flatmap.
2016-06-05 16:51:00 -07:00
Wenchen Fan 30c4774f33 [SPARK-15657][SQL] RowEncoder should validate the data type of input object
## What changes were proposed in this pull request?

This PR improves the error handling of `RowEncoder`. When we create a `RowEncoder` with a given schema, we should validate the data type of input object. e.g. we should throw an exception when a field is boolean but is declared as a string column.

This PR also removes the support to use `Product` as a valid external type of struct type.  This support is added at https://github.com/apache/spark/pull/9712, but is incomplete, e.g. nested product, product in array are both not working.  However, we never officially support this feature and I think it's ok to ban it.

## How was this patch tested?

new tests in `RowEncoderSuite`.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13401 from cloud-fan/bug.
2016-06-05 15:59:52 -07:00
Weiqing Yang 0f307db5e1 [SPARK-15707][SQL] Make Code Neat - Use map instead of if check.
## What changes were proposed in this pull request?
In forType function of object RandomDataGenerator, the code following:
if (maybeSqlTypeGenerator.isDefined){
  ....
  Some(generator)
} else{
 None
}
will be changed. Instead, maybeSqlTypeGenerator.map will be used.

## How was this patch tested?
All of the current unit tests passed.

Author: Weiqing Yang <yangweiqing001@gmail.com>

Closes #13448 from Sherry302/master.
2016-06-04 22:44:03 +01:00
Josh Rosen 091f81e1f7 [SPARK-15762][SQL] Cache Metadata & StructType hashCodes; use singleton Metadata.empty
We should cache `Metadata.hashCode` and use a singleton for `Metadata.empty` because calculating metadata hashCodes appears to be a bottleneck for certain workloads.

We should also cache `StructType.hashCode`.

In an optimizer stress-test benchmark run by ericl, these `hashCode` calls accounted for roughly 40% of the total CPU time and this bottleneck was completely eliminated by the caching added by this patch.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13504 from JoshRosen/metadata-fix.
2016-06-04 14:14:50 -07:00
Lianhui Wang 2ca563cc45 [SPARK-15756][SQL] Support command 'create table stored as orcfile/parquetfile/avrofile'
## What changes were proposed in this pull request?
Now Spark SQL can support 'create table src stored as orc/parquet/avro' for orc/parquet/avro table. But Hive can support  both commands: ' stored as orc/parquet/avro' and 'stored as orcfile/parquetfile/avrofile'.
So this PR supports these keywords 'orcfile/parquetfile/avrofile' in Spark SQL.

## How was this patch tested?
add unit tests

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #13500 from lianhuiwang/SPARK-15756.
2016-06-03 22:19:22 -07:00
Davies Liu 3074f575a3 [SPARK-15391] [SQL] manage the temporary memory of timsort
## What changes were proposed in this pull request?

Currently, the memory for temporary buffer used by TimSort is always allocated as on-heap without bookkeeping, it could cause OOM both in on-heap and off-heap mode.

This PR will try to manage that by preallocate it together with the pointer array, same with RadixSort. It both works for on-heap and off-heap mode.

This PR also change the loadFactor of BytesToBytesMap to 0.5 (it was 0.70), it enables use to radix sort also makes sure that we have enough memory for timsort.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #13318 from davies/fix_timsort.
2016-06-03 16:45:09 -07:00
Andrew Or b1cc7da3e3 [SPARK-15722][SQL] Disallow specifying schema in CTAS statement
## What changes were proposed in this pull request?

As of this patch, the following throws an exception because the schemas may not match:
```
CREATE TABLE students (age INT, name STRING) AS SELECT * FROM boxes
```
but this is OK:
```
CREATE TABLE students AS SELECT * FROM boxes
```

## How was this patch tested?

SQLQuerySuite, HiveDDLCommandSuite

Author: Andrew Or <andrew@databricks.com>

Closes #13490 from andrewor14/ctas-no-column.
2016-06-03 14:39:41 -07:00
Wenchen Fan 11c83f83d5 [SPARK-15140][SQL] make the semantics of null input object for encoder clear
## What changes were proposed in this pull request?

For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow row to be null, only its columns can be null.

This PR explicitly add this constraint and throw exception if users break it.

## How was this patch tested?

several new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13469 from cloud-fan/null-object.
2016-06-03 14:28:19 -07:00
Wenchen Fan 61b80d552a [SPARK-15547][SQL] nested case class in encoder can have different number of fields from the real schema
## What changes were proposed in this pull request?

There are 2 kinds of `GetStructField`:

1. resolved from `UnresolvedExtractValue`, and it will have a `name` property.
2. created when we build deserializer expression for nested tuple, no `name` property.

When we want to validate the ordinals of nested tuple, we should only catch `GetStructField` without the name property.

## How was this patch tested?

new test in `EncoderResolutionSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13474 from cloud-fan/ordinal-check.
2016-06-03 14:26:24 -07:00
gatorsmile eb10b481ca [SPARK-15286][SQL] Make the output readable for EXPLAIN CREATE TABLE and DESC EXTENDED
#### What changes were proposed in this pull request?
Before this PR, the output of EXPLAIN of following SQL is like

```SQL
CREATE EXTERNAL TABLE extTable_with_partitions (key INT, value STRING)
PARTITIONED BY (ds STRING, hr STRING)
LOCATION '/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-b39a6185-8981-403b-a4aa-36fb2f4ca8a9'
```
``ExecutedCommand CreateTableCommand CatalogTable(`extTable_with_partitions`,CatalogTableType(EXTERNAL),CatalogStorageFormat(Some(/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-dd234718-e85d-4c5a-8353-8f1834ac0323),Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(key,int,true,None), CatalogColumn(value,string,true,None), CatalogColumn(ds,string,true,None), CatalogColumn(hr,string,true,None)),List(ds, hr),List(),List(),-1,,1463026413544,-1,Map(),None,None,None), false``

After this PR, the output is like

```
ExecutedCommand
:  +- CreateTableCommand CatalogTable(
	Table:`extTable_with_partitions`
	Created:Thu Jun 02 21:30:54 PDT 2016
	Last Access:Wed Dec 31 15:59:59 PST 1969
	Type:EXTERNAL
	Schema:[`key` int, `value` string, `ds` string, `hr` string]
	Partition Columns:[`ds`, `hr`]
	Storage(Location:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-a06083b8-8e88-4d07-9ff0-d6bd8d943ad3, InputFormat:org.apache.hadoop.mapred.TextInputFormat, OutputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
```

This is also applicable to `DESC EXTENDED`. However, this does not have special handling for Data Source Tables. If needed, we need to move the logics of `DDLUtil`. Let me know if we should do it in this PR. Thanks! rxin liancheng

#### How was this patch tested?
Manual testing

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13070 from gatorsmile/betterExplainCatalogTable.
2016-06-03 13:56:22 -07:00
Josh Rosen e526913989 [SPARK-15742][SQL] Reduce temp collections allocations in TreeNode transform methods
In Catalyst's TreeNode transform methods we end up calling `productIterator.map(...).toArray` in a number of places, which is slightly inefficient because it needs to allocate an `ArrayBuilder` and grow a temporary array. Since we already know the size of the final output (`productArity`), we can simply allocate an array up-front and use a while loop to consume the iterator and populate the array.

For most workloads, this performance difference is negligible but it does make a measurable difference in optimizer performance for queries that operate over very wide schemas (such as the benchmark queries in #13456).

### Perf results (from #13456 benchmarks)

**Before**

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

parsing large select:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
1 select expressions                            19 /   22          0.0    19119858.0       1.0X
10 select expressions                           23 /   25          0.0    23208774.0       0.8X
100 select expressions                          55 /   73          0.0    54768402.0       0.3X
1000 select expressions                        229 /  259          0.0   228606373.0       0.1X
2500 select expressions                        530 /  554          0.0   529938178.0       0.0X
```

**After**

```
parsing large select:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
1 select expressions                            15 /   21          0.0    14978203.0       1.0X
10 select expressions                           22 /   27          0.0    22492262.0       0.7X
100 select expressions                          48 /   64          0.0    48449834.0       0.3X
1000 select expressions                        189 /  208          0.0   189346428.0       0.1X
2500 select expressions                        429 /  449          0.0   428943897.0       0.0X
```

###

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13484 from JoshRosen/treenode-productiterator-map.
2016-06-03 13:53:02 -07:00
Ioana Delaney 9e2eb13ca5 [SPARK-15677][SQL] Query with scalar sub-query in the SELECT list throws UnsupportedOperationException
## What changes were proposed in this pull request?
Queries with scalar sub-query in the SELECT list run against a local, in-memory relation throw
UnsupportedOperationException exception.

Problem repro:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select (select min(c1) from t2) from t1").show()

java.lang.UnsupportedOperationException: Cannot evaluate expression: scalar-subquery#62 []
  at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:215)
  at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.eval(subquery.scala:62)
  at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
  at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:45)
  at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:29)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$37.applyOrElse(Optimizer.scala:1473)
```
The problem is specific to local, in memory relations. It is caused by rule ConvertToLocalRelation, which attempts to push down
a scalar-subquery expression to the local tables.

The solution prevents the rule to apply if Project references scalar subqueries.

## How was this patch tested?
Added regression tests to SubquerySuite.scala

Author: Ioana Delaney <ioanamdelaney@gmail.com>

Closes #13418 from ioana-delaney/scalarSubV2.
2016-06-03 12:04:27 -07:00
Wenchen Fan 190ff274fd [SPARK-15494][SQL] encoder code cleanup
## What changes were proposed in this pull request?

Our encoder framework has been evolved a lot, this PR tries to clean up the code to make it more readable and emphasise the concept that encoder should be used as a container of serde expressions.

1. move validation logic to analyzer instead of encoder
2. only have a `resolveAndBind` method in encoder instead of `resolve` and `bind`, as we don't have the encoder life cycle concept anymore.
3. `Dataset` don't need to keep a resolved encoder, as there is no such concept anymore. bound encoder is still needed to do serialization outside of query framework.
4. Using `BoundReference` to represent an unresolved field in deserializer expression is kind of weird, this PR adds a `GetColumnByOrdinal` for this purpose. (serializer expression still use `BoundReference`, we can replace it with `GetColumnByOrdinal` in follow-ups)

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>
Author: Cheng Lian <lian@databricks.com>

Closes #13269 from cloud-fan/clean-encoder.
2016-06-03 00:43:02 -07:00
Dongjoon Hyun b9fcfb3bd1 [SPARK-15744][SQL] Rename two TungstenAggregation*Suites and update codgen/error messages/comments
## What changes were proposed in this pull request?

For consistency, this PR updates some remaining `TungstenAggregation/SortBasedAggregate` after SPARK-15728.
- Update a comment in codegen in `VectorizedHashMapGenerator.scala`.
- `TungstenAggregationQuerySuite` --> `HashAggregationQuerySuite`
- `TungstenAggregationQueryWithControlledFallbackSuite` --> `HashAggregationQueryWithControlledFallbackSuite`
- Update two error messages in `SQLQuerySuite.scala` and `AggregationQuerySuite.scala`.
- Update several comments.

## How was this patch tested?

Manual (Only comment changes and test suite renamings).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13487 from dongjoon-hyun/SPARK-15744.
2016-06-03 00:36:06 -07:00
Sameer Agarwal f7288e166c [SPARK-15745][SQL] Use classloader's getResource() for reading resource files in HiveTests
## What changes were proposed in this pull request?

This is a cleaner approach in general but my motivation behind this change in particular is to be able to run these tests from anywhere without relying on system properties.

## How was this patch tested?

Test only change

Author: Sameer Agarwal <sameer@databricks.com>

Closes #13489 from sameeragarwal/resourcepath.
2016-06-03 00:13:43 -07:00
Xin Wu 76aa45d359 [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem
## What changes were proposed in this pull request?
##### The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are retrieved from the provided path. These FileStatus objects include directories for the partitions (id=0 and id=2 in the jira). However, these directory `FileStatus` objects also try to invoke `getFileBlockLocations` where directory is not allowed for `DistributedFileSystem`, hence the exception happens.

This PR is to remove the block of code that invokes `getFileBlockLocations` for every FileStatus object of the provided path. Instead, we call `HadoopFsRelation.listLeafFiles` directly because this utility method filters out the directories before calling `getFileBlockLocations` for generating `LocatedFileStatus` objects.

## How was this patch tested?
Regtest is run. Manual test:
```
scala> spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-----+---+
| text| id|
+-----+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-----+---+

       spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-----+---+
| text| id|
+-----+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-----+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case in the unit test bucket that can test a real hdfs file location. Any suggestions will be appreciated.

Author: Xin Wu <xinwu@us.ibm.com>

Closes #13463 from xwu0226/SPARK-14959.
2016-06-02 22:49:17 -07:00
Sean Zhong 6dde27404c [SPARK-15733][SQL] Makes the explain output less verbose by hiding some verbose output like None, null, empty List, and etc.
## What changes were proposed in this pull request?

This PR makes the explain output less verbose by hiding some verbose output like `None`, `null`, empty List `[]`, empty set `{}`, and etc.

**Before change**:

```
== Physical Plan ==
ExecutedCommand
:  +- ShowTablesCommand None, None
```

**After change**:

```
== Physical Plan ==
ExecutedCommand
:  +- ShowTablesCommand
```

## How was this patch tested?

Manual test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13470 from clockfly/verbose_breakdown_4.
2016-06-02 22:45:37 -07:00
Eric Liang 901b2e69ea [SPARK-15724] Add benchmarks for performance over wide schemas
## What changes were proposed in this pull request?

This adds microbenchmarks for tracking performance of queries over very wide or deeply nested DataFrames. It seems performance degrades when DataFrames get thousands of columns wide or hundreds of fields deep.

## How was this patch tested?

Current results included.

cc rxin JoshRosen

Author: Eric Liang <ekl@databricks.com>

Closes #13456 from ericl/sc-3468.
2016-06-02 19:42:05 -07:00
Wenchen Fan 6323e4bd76 [SPARK-15732][SQL] better error message when use java reserved keyword as field name
## What changes were proposed in this pull request?

When users create a case class and use java reserved keyword as field name, spark sql will generate illegal java code and throw exception at runtime.

This PR checks the field names when building the encoder, and if illegal field names are used, throw exception immediately with a good error message.

## How was this patch tested?

new test in DatasetSuite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13485 from cloud-fan/java.
2016-06-02 18:13:04 -07:00
Andrew Or d1c1fbc345 [SPARK-15715][SQL] Fix alter partition with storage information in Hive
## What changes were proposed in this pull request?

This command didn't work for Hive tables. Now it does:
```
ALTER TABLE boxes PARTITION (width=3)
    SET SERDE 'com.sparkbricks.serde.ColumnarSerDe'
    WITH SERDEPROPERTIES ('compress'='true')
```

## How was this patch tested?

`HiveExternalCatalogSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #13453 from andrewor14/alter-partition-storage.
2016-06-02 17:44:48 -07:00
Wenchen Fan f34aadc54c [SPARK-15718][SQL] better error message for writing bucketed data
## What changes were proposed in this pull request?

Currently we don't support bucketing for `save` and `insertInto`.

For `save`, we just write the data out into a directory users specified, and it's not a table, we don't keep its metadata. When we read it back, we have no idea if the data is bucketed or not, so it doesn't make sense to use `save` to write bucketed data, as we can't use the bucket information anyway.

We can support it in the future, once we have features like bucket discovery, or we save bucket information in the data directory too, so that we don't need to rely on a metastore.

For `insertInto`, it inserts data into an existing table, so it doesn't make sense to specify bucket information, as we should get the bucket information from the existing table.

This PR improves the error message for the above 2  cases.
## How was this patch tested?

new test in `BukctedWriteSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13452 from cloud-fan/error-msg.
2016-06-02 17:39:56 -07:00