Commit graph

5608 commits

Author SHA1 Message Date
aokolnychyi f44ead89f4 [SPARK-21538][SQL] Attribute resolution inconsistency in the Dataset API
## What changes were proposed in this pull request?

This PR contains a tiny update that removes an attribute resolution inconsistency in the Dataset API. The following example is taken from the ticket description:

```
spark.range(1).withColumnRenamed("id", "x").sort(col("id"))  // works
spark.range(1).withColumnRenamed("id", "x").sort($"id")  // works
spark.range(1).withColumnRenamed("id", "x").sort('id) // works
spark.range(1).withColumnRenamed("id", "x").sort("id") // fails with:
org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among (x);
```
The above `AnalysisException` happens because the last case calls `Dataset.apply()` to convert strings into columns, which triggers attribute resolution. To make the API consistent between overloaded methods, this PR defers the resolution and constructs columns directly.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #18740 from aokolnychyi/spark-21538.
2017-07-27 16:49:42 -07:00
Wenchen Fan 9f5647d62e [SPARK-21319][SQL] Fix memory leak in sorter
## What changes were proposed in this pull request?

`UnsafeExternalSorter.recordComparator` can be either `KVComparator` or `RowComparator`, and both of them will keep the reference to the input rows they compared last time.

After sorting, we return the sorted iterator to upstream operators. However, the upstream operators may take a while to consume up the sorted iterator, and `UnsafeExternalSorter` is registered to `TaskContext` at [here](https://github.com/apache/spark/blob/v2.2.0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L159-L161), which means we will keep the `UnsafeExternalSorter` instance and keep the last compared input rows in memory until the sorted iterator is consumed up.

Things get worse if we sort within partitions of a dataset and coalesce all partitions into one, as we will keep a lot of input rows in memory and the time to consume up all the sorted iterators is long.

This PR takes over https://github.com/apache/spark/pull/18543 , the idea is that, we do not keep the record comparator instance in `UnsafeExternalSorter`, but a generator of record comparator.

close #18543

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18679 from cloud-fan/memory-leak.
2017-07-27 22:56:26 +08:00
Takuya UESHIN 2ff35a057e [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add ArrayType and StructType support.
## What changes were proposed in this pull request?

This is a refactoring of `ArrowConverters` and related classes.

1. Refactor `ColumnWriter` as `ArrowWriter`.
2. Add `ArrayType` and `StructType` support.
3. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` creation.

## How was this patch tested?

Added some tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18655 from ueshin/issues/SPARK-21440.
2017-07-27 19:19:51 +08:00
Kazuaki Ishizaki ebbe589d12 [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8
## What changes were proposed in this pull request?

This PR ensures that `Unsafe.sizeInBytes` must be a multiple of 8. It it is not satisfied. `Unsafe.hashCode` causes the assertion violation.

## How was this patch tested?

Will add test cases

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18503 from kiszk/SPARK-21271.
2017-07-27 15:27:24 +08:00
hyukjinkwon 60472dbfd9 [SPARK-21485][SQL][DOCS] Spark SQL documentation generation for built-in functions
## What changes were proposed in this pull request?

This generates a documentation for Spark SQL built-in functions.

One drawback is, this requires a proper build to generate built-in function list.
Once it is built, it only takes few seconds by `sql/create-docs.sh`.

Please see https://spark-test.github.io/sparksqldoc/ that I hosted to show the output documentation.

There are few more works to be done in order to make the documentation pretty, for example, separating `Arguments:` and `Examples:` but I guess this should be done within `ExpressionDescription` and `ExpressionInfo` rather than manually parsing it. I will fix these in a follow up.

This requires `pip install mkdocs` to generate HTMLs from markdown files.

## How was this patch tested?

Manually tested:

```
cd docs
jekyll build
```
,

```
cd docs
jekyll serve
```

and

```
cd sql
create-docs.sh
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18702 from HyukjinKwon/SPARK-21485.
2017-07-26 09:38:51 -07:00
gatorsmile ebc24a9b7f [SPARK-20586][SQL] Add deterministic to ScalaUDF
### What changes were proposed in this pull request?
Like [Hive UDFType](https://hive.apache.org/javadocs/r2.0.1/api/org/apache/hadoop/hive/ql/udf/UDFType.html), we should allow users to add the extra flags for ScalaUDF and JavaUDF too. _stateful_/_impliesOrder_ are not applicable to our Scala UDF. Thus, we only add the following two flags.

- deterministic: Certain optimizations should not be applied if UDF is not deterministic. Deterministic UDF returns same result each time it is invoked with a particular input. This determinism just needs to hold within the context of a query.

When the deterministic flag is not correctly set, the results could be wrong.

For ScalaUDF in Dataset APIs, users can call the following extra APIs for `UserDefinedFunction` to make the corresponding changes.
- `nonDeterministic`: Updates UserDefinedFunction to non-deterministic.

Also fixed the Java UDF name loss issue.

Will submit a separate PR for `distinctLike`  for UDAF

### How was this patch tested?
Added test cases for both ScalaUDF

Author: gatorsmile <gatorsmile@gmail.com>
Author: Wenchen Fan <cloud0fan@gmail.com>

Closes #17848 from gatorsmile/udfRegister.
2017-07-25 17:19:44 -07:00
Kazuaki Ishizaki 7f295059ca [SPARK-21516][SQL][TEST] Overriding afterEach() in DatasetCacheSuite must call super.afterEach()
## What changes were proposed in this pull request?

This PR ensures to call `super.afterEach()` in overriding `afterEach()` method in `DatasetCacheSuite`. When we override `afterEach()` method in Testsuite, we have to call `super.afterEach()`.

This is a follow-up of #18719 and SPARK-21512.

## How was this patch tested?

Used the existing test suite

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18721 from kiszk/SPARK-21516.
2017-07-25 10:51:00 +08:00
Wenchen Fan 86664338f2 [SPARK-17528][SQL][FOLLOWUP] remove unnecessary data copy in object hash aggregate
## What changes were proposed in this pull request?

In #18483 , we fixed the data copy bug when saving into `InternalRow`, and removed all workarounds for this bug in the aggregate code path. However, the object hash aggregate was missed, this PR fixes it.

This patch is also a requirement for #17419 , which shows that DataFrame version is slower than RDD version because of this issue.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18712 from cloud-fan/minor.
2017-07-24 10:18:28 -07:00
Kazuaki Ishizaki 481f079294 [SPARK-21512][SQL][TEST] DatasetCacheSuite needs to execute unpersistent after executing peristent
## What changes were proposed in this pull request?

This PR avoids to reuse unpersistent dataset among test cases by making dataset unpersistent at the end of each test case.

In `DatasetCacheSuite`, the test case `"get storage level"` does not make dataset unpersisit after make the dataset persisitent. The same dataset will be made persistent by the test case `"persist and then rebind right encoder when join 2 datasets"` Thus, we run these test cases, the second case does not perform to make dataset persistent. This is because in

When we run only the second case, it performs to make dataset persistent. It is not good to change behavior of the second test suite. The first test case should correctly make dataset unpersistent.

```
Testing started at 17:52 ...
01:52:15.053 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
01:52:48.595 WARN org.apache.spark.sql.execution.CacheManager: Asked to cache already cached data.
01:52:48.692 WARN org.apache.spark.sql.execution.CacheManager: Asked to cache already cached data.
01:52:50.864 WARN org.apache.spark.storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
01:52:50.864 WARN org.apache.spark.storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
01:52:50.868 WARN org.apache.spark.storage.BlockManager: Block rdd_8_1 replicated to only 0 peer(s) instead of 1 peers
01:52:50.868 WARN org.apache.spark.storage.BlockManager: Block rdd_8_0 replicated to only 0 peer(s) instead of 1 peers
```

After this PR, these messages do not appear
```
Testing started at 18:14 ...
02:15:05.329 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Process finished with exit code 0
```

## How was this patch tested?

Used the existing test

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18719 from kiszk/SPARK-21512.
2017-07-23 11:31:27 -07:00
Reynold Xin a4eac8b0bb [MINOR] Remove **** in test case names in FlatMapGroupsWithStateSuite
## What changes were proposed in this pull request?
This patch removes the `****` string from test names in FlatMapGroupsWithStateSuite. `***` is a common string developers grep for when using Scala test (because it immediately shows the failing test cases). The existence of the `****` in test names disrupts that workflow.

## How was this patch tested?
N/A - test only change.

Author: Reynold Xin <rxin@databricks.com>

Closes #18715 from rxin/FlatMapGroupsWithStateStar.
2017-07-23 10:41:38 -07:00
pj.fanning 2a53fbfce7 [SPARK-20871][SQL] limit logging of Janino code
## What changes were proposed in this pull request?

When the code that is generated is greater than 64k, then Janino compile will fail and CodeGenerator.scala will log the entire code at Error level.
SPARK-20871 suggests only logging the code at Debug level.
Since, the code is already logged at debug level, this Pull Request proposes not including the formatted code in the Error logging and exception message at all.
When an exception occurs, the code will be logged at Info level but truncated if it is more than 1000 lines long.

## How was this patch tested?

Existing tests were run.
An extra test test case was added to CodeFormatterSuite to test the new maxLines parameter,

Author: pj.fanning <pj.fanning@workday.com>

Closes #18658 from pjfanning/SPARK-20871.
2017-07-23 10:38:03 -07:00
Wenchen Fan ccaee5b54d [SPARK-10063] Follow-up: remove a useless test related to an old output committer
## What changes were proposed in this pull request?

It's a follow-up of https://github.com/apache/spark/pull/18689 , which forgot to remove a useless test.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18716 from cloud-fan/test.
2017-07-23 21:32:59 +08:00
Takuya UESHIN 2f1468429f [SPARK-21472][SQL][FOLLOW-UP] Introduce ArrowColumnVector as a reader for Arrow vectors.
## What changes were proposed in this pull request?

This is a follow-up of #18680.

In some environment, a compile error happens saying:

```
.../sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
error: not found: type Array
  public void loadBytes(Array array) {
                        ^
```

This pr fixes it.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18701 from ueshin/issues/SPARK-21472_fup1.
2017-07-21 21:06:56 +08:00
Wenchen Fan 3ac6093086 [SPARK-10063] Follow-up: remove dead code related to an old output committer
## What changes were proposed in this pull request?

DirectParquetOutputCommitter was removed from Spark as it was deemed unsafe to use. We however still have some code to generate warning. This patch removes those code as well.

This is kind of a follow-up of https://github.com/apache/spark/pull/16796

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18689 from cloud-fan/minor.
2017-07-20 12:08:20 -07:00
Takuya UESHIN cb19880cd8 [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.
## What changes were proposed in this pull request?

Introducing `ArrowColumnVector` as a reader for Arrow vectors.
It extends `ColumnVector`, so we will be able to use it with `ColumnarBatch` and its functionalities.
Currently it supports primitive types and `StringType`, `ArrayType` and `StructType`.

## How was this patch tested?

Added tests for `ArrowColumnVector` and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18680 from ueshin/issues/SPARK-21472.
2017-07-20 21:00:30 +08:00
gatorsmile 256358f66a [SPARK-21477][SQL][MINOR] Mark LocalTableScanExec's input data transient
## What changes were proposed in this pull request?
This PR is to mark the parameter `rows` and `unsafeRow` of LocalTableScanExec transient. It can avoid serializing the unneeded objects.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18686 from gatorsmile/LocalTableScanExec.
2017-07-20 19:16:26 +08:00
Xiang Gao b7a40f64e6 [SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python
## What changes were proposed in this pull request?
This is the reopen of https://github.com/apache/spark/pull/14198, with merge conflicts resolved.

ueshin Could you please take a look at my code?

Fix bugs about types that result an array of null when creating DataFrame using python.

Python's array.array have richer type than python itself, e.g. we can have `array('f',[1,2,3])` and `array('d',[1,2,3])`. Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have `array('f')` in your rows.

A simple code to reproduce this bug is:

```
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,DataFrame
from array import array

sc = SparkContext()
sqlContext = SQLContext(sc)

row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))
rows = sc.parallelize([ row1 ])
df = sqlContext.createDataFrame(rows)
df.show()
```

which have output

```
+---------------+------------------+
|    doublearray|        floatarray|
+---------------+------------------+
|[1.0, 2.0, 3.0]|[null, null, null]|
+---------------+------------------+
```

## How was this patch tested?

New test case added

Author: Xiang Gao <qasdfgtyuiop@gmail.com>
Author: Gao, Xiang <qasdfgtyuiop@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18444 from zasdfgbnm/fix_array_infer.
2017-07-20 12:46:06 +09:00
Burak Yavuz 2c9d5ef1f0 [SPARK-21463] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex
## What changes were proposed in this pull request?

When using the MetadataLogFileIndex to read back a table, we don't respect the user provided schema as the proper column types. This can lead to issues when trying to read strings that look like dates that get truncated to DateType, or longs being truncated to IntegerType, just because a long value doesn't exist.

## How was this patch tested?

Unit tests and manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #18676 from brkyvz/stream-partitioning.
2017-07-19 15:56:26 -07:00
Corey Woodfield 8cd9cdf17a [SPARK-21333][DOCS] Removed invalid joinTypes from javadoc of Dataset#joinWith
## What changes were proposed in this pull request?

Two invalid join types were mistakenly listed in the javadoc for joinWith, in the Dataset class. I presume these were copied from the javadoc of join, but since joinWith returns a Dataset\<Tuple2\>, left_semi and left_anti are invalid, as they only return values from one of the datasets, instead of from both

## How was this patch tested?

I ran the following code :
```
public static void main(String[] args) {
	SparkSession spark = new SparkSession(new SparkContext("local[*]", "Test"));
	Dataset<Row> one = spark.createDataFrame(Arrays.asList(new Bean(1), new Bean(2), new Bean(3), new Bean(4), new Bean(5)), Bean.class);
	Dataset<Row> two = spark.createDataFrame(Arrays.asList(new Bean(4), new Bean(5), new Bean(6), new Bean(7), new Bean(8), new Bean(9)), Bean.class);

	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "inner").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "cross").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_semi").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_anti").show();} catch (Exception e) {e.printStackTrace();}
}
```
which tests all the different join types, and the last two (left_semi and left_anti) threw exceptions. The same code using join instead of joinWith did fine. The Bean class was just a java bean with a single int field, x.

Author: Corey Woodfield <coreywoodfield@gmail.com>

Closes #18462 from coreywoodfield/master.
2017-07-19 15:21:38 -07:00
DFFuture c9729187bc [SPARK-21446][SQL] Fix setAutoCommit never executed
## What changes were proposed in this pull request?
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-21446
options.asConnectionProperties can not have fetchsize,because fetchsize belongs to Spark-only options, and Spark-only options have been excluded in connection properities.
So change properties of beforeFetch from  options.asConnectionProperties.asScala.toMap to options.asProperties.asScala.toMap

## How was this patch tested?

Author: DFFuture <albert.zhang23@gmail.com>

Closes #18665 from DFFuture/sparksql_pg.
2017-07-19 14:45:11 -07:00
Tathagata Das 70fe99dc62 [SPARK-21464][SS] Minimize deprecation warnings caused by ProcessingTime class
## What changes were proposed in this pull request?

Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime causes deprecation warnings during compilation. This cannot be avoided entirely as even though it is deprecated as a public API, ProcessingTime instances are used internally in TriggerExecutor. This PR is to minimize the warning by removing its uses from tests as much as possible.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18678 from tdas/SPARK-21464.
2017-07-19 11:02:07 -07:00
donnyzone 6b6dd682e8 [SPARK-21441][SQL] Incorrect Codegen in SortMergeJoinExec results failures in some cases
## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

## How was this patch tested?
Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18656 from DonnyZone/Fix_SortMergeJoinExec.
2017-07-19 21:48:54 +08:00
jinxing 4eb081cc87 [SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM.
## What changes were proposed in this pull request?

In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 following)
from table
```
We can refine the logic and just add the qualified rows into buffer.

## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10`
against a table with 4  columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.
```
MemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
	at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

Author: jinxing <jinxing6042@126.com>

Closes #18634 from jinxing64/SPARK-21414.
2017-07-19 21:35:26 +08:00
gatorsmile ae253e5a87 [SPARK-21273][SQL][FOLLOW-UP] Propagate logical plan stats using visitor pattern and mixin
## What changes were proposed in this pull request?
This PR is to add back the stats propagation of `Window` and remove the stats calculation of the leaf node `Range`, which has been covered by 9c32d2507d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala (L56)

## How was this patch tested?
Added two test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18677 from gatorsmile/visitStats.
2017-07-19 10:57:15 +08:00
xuanyuanking 81c99a5b95 [SPARK-21435][SQL] Empty files should be skipped while write to file
## What changes were proposed in this pull request?

Add EmptyDirectoryWriteTask for empty task while writing files. Fix the empty result for parquet format by leaving the first partition for meta writing.

## How was this patch tested?

Add new test in `FileFormatWriterSuite `

Author: xuanyuanking <xyliyuanjian@gmail.com>

Closes #18654 from xuanyuanking/SPARK-21435.
2017-07-19 10:27:42 +08:00
Tathagata Das 84f1b25f31 [SPARK-21462][SS] Added batchId to StreamingQueryProgress.json
## What changes were proposed in this pull request?

- Added batchId to StreamingQueryProgress.json as that was missing from the generated json.
- Also, removed recently added numPartitions from StatefulOperatorProgress as this value does not change through the query run, and there are other ways to find that.

## How was this patch tested?
Updated unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18675 from tdas/SPARK-21462.
2017-07-18 16:29:45 -07:00
Wenchen Fan f18b905f6c [SPARK-21457][SQL] ExternalCatalog.listPartitions should correctly handle partition values with dot
## What changes were proposed in this pull request?

When we list partitions from hive metastore with a partial partition spec, we are expecting exact matching according to the partition values. However, hive treats dot specially and match any single character for dot. We should do an extra filter to drop unexpected partitions.

## How was this patch tested?

new regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18671 from cloud-fan/hive.
2017-07-18 15:56:16 -07:00
Sean Owen e26dac5feb [SPARK-21415] Triage scapegoat warnings, part 1
## What changes were proposed in this pull request?

Address scapegoat warnings for:
- BigDecimal double constructor
- Catching NPE
- Finalizer without super
- List.size is O(n)
- Prefer Seq.empty
- Prefer Set.empty
- reverse.map instead of reverseMap
- Type shadowing
- Unnecessary if condition.
- Use .log1p
- Var could be val

In some instances like Seq.empty, I avoided making the change even where valid in test code to keep the scope of the change smaller. Those issues are concerned with performance and it won't matter for tests.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #18635 from srowen/Scapegoat1.
2017-07-18 08:47:17 +01:00
aokolnychyi 0be5fb41a6 [SPARK-21332][SQL] Incorrect result type inferred for some decimal expressions
## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #18583 from aokolnychyi/spark-21332.
2017-07-17 21:07:50 -07:00
Tathagata Das e9faae135c [SPARK-21409][SS] Follow up PR to allow different types of custom metrics to be exposed
## What changes were proposed in this pull request?

Implementation may expose both timing as well as size metrics. This PR enables that.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18661 from tdas/SPARK-21409-2.
2017-07-17 19:28:55 -07:00
gatorsmile a8c6d0f64e [MINOR] Improve SQLConf messages
### What changes were proposed in this pull request?
The current SQLConf messages of `spark.sql.hive.convertMetastoreParquet` and `spark.sql.hive.convertMetastoreOrc` are not very clear to end users. This PR is to improve them.

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18657 from gatorsmile/msgUpdates.
2017-07-18 09:15:18 +08:00
Tathagata Das 9d8c83179a [SPARK-21409][SS] Expose state store memory usage in SQL metrics and progress updates
## What changes were proposed in this pull request?

Currently, there is no tracking of memory usage of state stores. This JIRA is to expose that through SQL metrics and StreamingQueryProgress.

Additionally, added the ability to expose implementation-specific metrics through the StateStore APIs to the SQLMetrics.

## How was this patch tested?
Added unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18629 from tdas/SPARK-21409.
2017-07-17 16:48:15 -07:00
gatorsmile e398c28146 [SPARK-21354][SQL] INPUT FILE related functions do not support more than one sources
### What changes were proposed in this pull request?
The build-in functions `input_file_name`, `input_file_block_start`, `input_file_block_length` do not support more than one sources, like what Hive does. Currently, Spark does not block it and the outputs are ambiguous/non-deterministic. It could be from any side.

```
hive> select *, INPUT__FILE__NAME FROM t1, t2;
FAILED: SemanticException Column INPUT__FILE__NAME Found in more than One Tables/Subqueries
```

This PR blocks it and issues an error.

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18580 from gatorsmile/inputFileName.
2017-07-17 14:58:14 +08:00
Sean Owen fd52a747fd [SPARK-19810][SPARK-19810][MINOR][FOLLOW-UP] Follow-ups from to remove Scala 2.10
## What changes were proposed in this pull request?

Follow up to a few comments on https://github.com/apache/spark/pull/17150#issuecomment-315020196 that couldn't be addressed before it was merged.

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #18646 from srowen/SPARK-19810.2.
2017-07-17 09:22:42 +08:00
Kazuaki Ishizaki ac5d5d7959 [SPARK-21344][SQL] BinaryType comparison does signed byte array comparison
## What changes were proposed in this pull request?

This PR fixes a wrong comparison for `BinaryType`. This PR enables unsigned comparison and unsigned prefix generation for an array for `BinaryType`. Previous implementations uses signed operations.

## How was this patch tested?

Added a test suite in `OrderingSuite`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18571 from kiszk/SPARK-21344.
2017-07-14 20:16:04 -07:00
Shixiong Zhu 2d968a07d2 [SPARK-21421][SS] Add the query id as a local property to allow source and sink using it
## What changes were proposed in this pull request?

Add the query id as a local property to allow source and sink using it.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #18638 from zsxwing/SPARK-21421.
2017-07-14 14:37:27 -07:00
Sean Owen 425c4ada4c [SPARK-19810][BUILD][CORE] Remove support for Scala 2.10
## What changes were proposed in this pull request?

- Remove Scala 2.10 build profiles and support
- Replace some 2.10 support in scripts with commented placeholders for 2.12 later
- Remove deprecated API calls from 2.10 support
- Remove usages of deprecated context bounds where possible
- Remove Scala 2.10 workarounds like ScalaReflectionLock
- Other minor Scala warning fixes

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #17150 from srowen/SPARK-19810.
2017-07-13 17:06:24 +08:00
Wenchen Fan 780586a9f2 [SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings
## What changes were proposed in this pull request?

Currently, `RowDataSourceScanExec` and `FileSourceScanExec` rely on a "metadata" string map to implement equality comparison, since the RDDs they depend on cannot be directly compared. This has resulted in a number of correctness bugs around exchange reuse, e.g. SPARK-17673 and SPARK-16818.

To make these comparisons less brittle, we should refactor these classes to compare constructor parameters directly instead of relying on the metadata map.

This PR refactors `RowDataSourceScanExec`, `FileSourceScanExec` will be fixed in the follow-up PR.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18600 from cloud-fan/minor.
2017-07-12 09:23:54 -07:00
liuxian aaad34dc2f [SPARK-21007][SQL] Add SQL function - RIGHT && LEFT
## What changes were proposed in this pull request?
 Add  SQL function - RIGHT && LEFT, same as MySQL:
https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left
https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right

## How was this patch tested?
unit test

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18228 from 10110346/lx-wip-0607.
2017-07-12 18:51:19 +08:00
Xiao Li f587d2e3fa [SPARK-20842][SQL] Upgrade to 1.2.2 for Hive Metastore Client 1.2
### What changes were proposed in this pull request?
Hive 1.2.2 release is available. Below is the list of bugs fixed in 1.2.2

https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12332952&styleName=Text&projectId=12310843

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18063 from gatorsmile/upgradeHiveClientTo1.2.2.
2017-07-12 15:48:44 +08:00
Burak Yavuz e0af76a36a [SPARK-21370][SS] Add test for state reliability when one read-only state store aborts after read-write state store commits
## What changes were proposed in this pull request?

During Streaming Aggregation, we have two StateStores per task, one used as read-only in
`StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
will be called for these StateStores if they haven't committed their results. We need to
make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
accidentally lead to the deletion of state.

This PR adds a test for this condition.

## How was this patch tested?

This PR adds a test.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #18603 from brkyvz/ss-test.
2017-07-12 00:39:09 -07:00
Jane Wang 2cbfc975ba [SPARK-12139][SQL] REGEX Column Specification
## What changes were proposed in this pull request?
Hive interprets regular expression, e.g., `(a)?+.+` in query specification. This PR enables spark to support this feature when hive.support.quoted.identifiers is set to true.

## How was this patch tested?

- Add unittests in SQLQuerySuite.scala
- Run spark-shell tested the original failed query:
scala> hc.sql("SELECT `(a|b)?+.+` from test1").collect.foreach(println)

Author: Jane Wang <janewang@fb.com>

Closes #18023 from janewangfb/support_select_regex.
2017-07-11 22:00:36 -07:00
gatorsmile d3e071658f [SPARK-19285][SQL] Implement UDF0
### What changes were proposed in this pull request?
This PR is to implement UDF0. `UDF0` is needed when users need to implement a JAVA UDF with no argument.

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18598 from gatorsmile/udf0.
2017-07-11 15:44:29 -07:00
hyukjinkwon ebc124d4c4 [SPARK-21365][PYTHON] Deduplicate logics parsing DDL type/schema definition
## What changes were proposed in this pull request?

This PR deals with four points as below:

- Reuse existing DDL parser APIs rather than reimplementing within PySpark

- Support DDL formatted string, `field type, field type`.

- Support case-insensitivity for parsing.

- Support nested data types as below:

  **Before**
  ```
  >>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
  ...
  ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
  ```

  ```
  >>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
  ...
  ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
  ```

  ```
  >>> spark.createDataFrame([[1]], "a int").show()
  ...
  ValueError: Could not parse datatype: a int
  ```

  **After**
  ```
  >>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
  +---+
  |  a|
  +---+
  |[1]|
  +---+
  ```

  ```
  >>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
  +---+
  |  a|
  +---+
  |[1]|
  +---+
  ```

  ```
  >>> spark.createDataFrame([[1]], "a int").show()
  +---+
  |  a|
  +---+
  |  1|
  +---+
  ```

## How was this patch tested?

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18590 from HyukjinKwon/deduplicate-python-ddl.
2017-07-11 22:03:10 +08:00
Xingbo Jiang 66d2168655 [SPARK-21366][SQL][TEST] Add sql test for window functions
## What changes were proposed in this pull request?

Add sql test for window functions, also remove uncecessary test cases in `WindowQuerySuite`.

## How was this patch tested?

Added `window.sql` and the corresponding output file.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18591 from jiangxb1987/window.
2017-07-11 21:52:54 +08:00
hyukjinkwon 7514db1dec [SPARK-21263][SQL] Do not allow partially parsing double and floats via NumberFormat in CSV
## What changes were proposed in this pull request?

This PR proposes to remove `NumberFormat.parse` use to disallow a case of partially parsed data. For example,

```
scala> spark.read.schema("a DOUBLE").option("mode", "FAILFAST").csv(Seq("10u12").toDS).show()
+----+
|   a|
+----+
|10.0|
+----+
```

## How was this patch tested?

Unit tests added in `UnivocityParserSuite` and `CSVSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18532 from HyukjinKwon/SPARK-21263.
2017-07-11 11:11:08 +01:00
Michael Allman a4baa8f48f [SPARK-20331][SQL] Enhanced Hive partition pruning predicate pushdown
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20331)

## What changes were proposed in this pull request?

Spark 2.1 introduced scalable support for Hive tables with huge numbers of partitions. Key to leveraging this support is the ability to prune unnecessary table partitions to answer queries. Spark supports a subset of the class of partition pruning predicates that the Hive metastore supports. If a user writes a query with a partition pruning predicate that is *not* supported by Spark, Spark falls back to loading all partitions and pruning client-side. We want to broaden Spark's current partition pruning predicate pushdown capabilities.

One of the key missing capabilities is support for disjunctions. For example, for a table partitioned by date, writing a query with a predicate like

    date = 20161011 or date = 20161014

will result in Spark fetching all partitions. For a table partitioned by date and hour, querying a range of hours across dates can be quite difficult to accomplish without fetching all partition metadata.

The current partition pruning support supports only comparisons against literals. We can expand that to foldable expressions by evaluating them at planning time.

We can also implement support for the "IN" comparison by expanding it to a sequence of "OR"s.

## How was this patch tested?

The `HiveClientSuite` and `VersionsSuite` were refactored and simplified to make Hive client-based, version-specific testing more modular and conceptually simpler. There are now two Hive test suites: `HiveClientSuite` and `HivePartitionFilteringSuite`. These test suites have a single-argument constructor taking a `version` parameter. As such, these test suites cannot be run by themselves. Instead, they have been bundled into "aggregation" test suites which run each suite for each Hive client version. These aggregation suites are called `HiveClientSuites` and `HivePartitionFilteringSuites`. The `VersionsSuite` and `HiveClientSuite` have been refactored into each of these aggregation suites, respectively.

`HiveClientSuite` and `HivePartitionFilteringSuite` subclass a new abstract class, `HiveVersionSuite`. `HiveVersionSuite` collects functionality related to testing a single Hive version and overrides relevant test suite methods to display version-specific information.

A new trait, `HiveClientVersions`, has been added with a sequence of Hive test versions.

Author: Michael Allman <michael@videoamp.com>

Closes #17633 from mallman/spark-20331-enhanced_partition_pruning_pushdown.
2017-07-11 14:50:11 +08:00
jinxing 97a1aa2c70 [SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray.
## What changes were proposed in this pull request?

In current code, it is expensive to use `UnboundedFollowingWindowFunctionFrame`, because it is iterating from the start to lower bound every time calling `write` method. When traverse the iterator, it's possible to skip some spilled files thus to save some time.

## How was this patch tested?

Added unit test

Did a small test for benchmark:

Put 2000200 rows into `UnsafeExternalSorter`-- 2 spill files(each contains 1000000 rows) and inMemSorter contains 200 rows.
Move the iterator forward to index=2000001.

*With this change*:
`getIterator(2000001)`, it will cost almost 0ms~1ms;
*Without this change*:
`for(int i=0; i<2000001; i++)geIterator().loadNext()`, it will cost 300ms.

Author: jinxing <jinxing6042@126.com>

Closes #18541 from jinxing64/SPARK-21315.
2017-07-11 11:47:47 +08:00
gatorsmile 1471ee7af5 [SPARK-21350][SQL] Fix the error message when the number of arguments is wrong when invoking a UDF
### What changes were proposed in this pull request?
Users get a very confusing error when users specify a wrong number of parameters.
```Scala
    val df = spark.emptyDataFrame
    spark.udf.register("foo", (_: String).length)
    df.selectExpr("foo(2, 3, 4)")
```
```
org.apache.spark.sql.UDFSuite$$anonfun$9$$anonfun$apply$mcV$sp$12 cannot be cast to scala.Function3
java.lang.ClassCastException: org.apache.spark.sql.UDFSuite$$anonfun$9$$anonfun$apply$mcV$sp$12 cannot be cast to scala.Function3
	at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:109)
```

This PR is to capture the exception and issue an error message that is consistent with what we did for built-in functions. After the fix, the error message is improved to
```
Invalid number of arguments for function foo; line 1 pos 0
org.apache.spark.sql.AnalysisException: Invalid number of arguments for function foo; line 1 pos 0
	at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:119)
```

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18574 from gatorsmile/statsCheck.
2017-07-11 11:19:59 +08:00
Takeshi Yamamuro a2bec6c92a [SPARK-21043][SQL] Add unionByName in Dataset
## What changes were proposed in this pull request?
This pr added `unionByName` in `DataSet`.
Here is how to use:
```
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   6|   4|   5|
// +----+----+----+
```

## How was this patch tested?
Added tests in `DataFrameSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #18300 from maropu/SPARK-21043-2.
2017-07-10 20:16:29 -07:00