Commit graph

6496 commits

Author SHA1 Message Date
Wenchen Fan ac4ca7c4dd [SPARK-24012][SQL][TEST][FOLLOWUP] add unit test
## What changes were proposed in this pull request?

a followup of https://github.com/apache/spark/pull/21100

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21154 from cloud-fan/test.
2018-04-25 13:42:44 -07:00
Tathagata Das 396938ef02 [SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources
## What changes were proposed in this pull request?

In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.

```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream....
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.

With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.

In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.

## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21126 from tdas/SPARK-24050.
2018-04-25 12:21:55 -07:00
Takeshi Yamamuro 20ca208bcd [SPARK-23880][SQL] Do not trigger any jobs for caching data
## What changes were proposed in this pull request?
This pr fixed code so that `cache` could prevent any jobs from being triggered.
For example, in the current master, an operation below triggers a actual job;
```
val df = spark.range(10000000000L)
  .filter('id > 1000)
  .orderBy('id.desc)
  .cache()
```
This triggers a job while the cache should be lazy. The problem is that, when creating `InMemoryRelation`, we build the RDD, which calls `SparkPlan.execute` and may trigger jobs, like sampling job for range partitioner, or broadcast job.

This pr removed the code to build a cached `RDD` in the constructor of `InMemoryRelation` and added `CachedRDDBuilder` to lazily build the `RDD` in `InMemoryRelation`. Then, the first call of `CachedRDDBuilder.cachedColumnBuffers` triggers a job to materialize the cache in  `InMemoryTableScanExec` .

## How was this patch tested?
Added tests in `CachedTableSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21018 from maropu/SPARK-23880.
2018-04-25 19:06:18 +08:00
liutang123 64e8408e6f [SPARK-24012][SQL] Union of map and other compatible column
## What changes were proposed in this pull request?
Union of map and other compatible column result in unresolved operator 'Union; exception

Reproduction
`spark-sql>select map(1,2), 'str' union all select map(1,2,3,null), 1`
Output:
```
Error in query: unresolved operator 'Union;;
'Union
:- Project [map(1, 2) AS map(1, 2)#106, str AS str#107]
:  +- OneRowRelation$
+- Project [map(1, cast(2 as int), 3, cast(null as int)) AS map(1, CAST(2 AS INT), 3, CAST(NULL AS INT))#109, 1 AS 1#108]
   +- OneRowRelation$
```
So, we should cast part of columns to be compatible when appropriate.

## How was this patch tested?
Added a test (query union of map and other columns) to SQLQueryTestSuite's union.sql.

Author: liutang123 <liutang123@yeah.net>

Closes #21100 from liutang123/SPARK-24012.
2018-04-25 18:10:51 +08:00
mn-mikke 5fea17b3be [SPARK-23821][SQL] Collection function: flatten
## What changes were proposed in this pull request?

This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises:
- An expression for flattening array structure
- Flatten function
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(Seq(1, 2), Seq(4, 5)),
  Seq(null, Seq(1))
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 064 */               project_numElements,
/* 065 */               4);
/* 066 */             if (project_size > 2147483632) {
/* 067 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 068 */                 project_size + " bytes of data due to exceeding the limit 2147483632" +
/* 069 */                 " bytes for UnsafeArrayData.");
/* 070 */             }
/* 071 */
/* 072 */             byte[] project_array = new byte[(int)project_size];
/* 073 */             UnsafeArrayData project_tempArrayData = new UnsafeArrayData();
/* 074 */             Platform.putLong(project_array, 16, project_numElements);
/* 075 */             project_tempArrayData.pointTo(project_array, 16, (int)project_size);
/* 076 */             int project_counter = 0;
/* 077 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 078 */               ArrayData arr = inputadapter_value.getArray(k);
/* 079 */               for (int l = 0; l < arr.numElements(); l++) {
/* 080 */                 if (arr.isNullAt(l)) {
/* 081 */                   project_tempArrayData.setNullAt(project_counter);
/* 082 */                 } else {
/* 083 */                   project_tempArrayData.setInt(
/* 084 */                     project_counter,
/* 085 */                     arr.getInt(l)
/* 086 */                   );
/* 087 */                 }
/* 088 */                 project_counter++;
/* 089 */               }
/* 090 */             }
/* 091 */             project_value = project_tempArrayData;
/* 092 */
/* 093 */           }
/* 094 */
/* 095 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq(Seq("a", "b"), Seq(null, "d")),
  Seq(null, Seq("a"))
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             Object[] project_arrayObject = new Object[(int)project_numElements];
/* 064 */             int project_counter = 0;
/* 065 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 066 */               ArrayData arr = inputadapter_value.getArray(k);
/* 067 */               for (int l = 0; l < arr.numElements(); l++) {
/* 068 */                 project_arrayObject[project_counter] = arr.getUTF8String(l);
/* 069 */                 project_counter++;
/* 070 */               }
/* 071 */             }
/* 072 */             project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObject);
/* 073 */
/* 074 */           }
/* 075 */
/* 076 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #20938 from mn-mikke/feature/array-api-flatten-to-master.
2018-04-25 11:19:08 +09:00
Jose Torres d6c26d1c9a [SPARK-24038][SS] Refactor continuous writing to its own class
## What changes were proposed in this pull request?

Refactor continuous writing to its own class.

See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway.

## How was this patch tested?

existing unit tests - refactoring only

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21116 from jose-torres/SPARK-24038.
2018-04-24 17:06:03 -07:00
Takeshi Yamamuro 4926a7c2f0 [SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval
## What changes were proposed in this pull request?
This pr is a follow-up of #20980 and fixes code to reuse `InternalRow` for converting input keys/values in `ExternalMapToCatalyst` eval.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21137 from maropu/SPARK-23589-FOLLOWUP.
2018-04-24 17:52:05 +02:00
seancxmao c303b1b676 [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId
## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: d28d5732ae/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala (L66)

## How was this patch tested?
N/A

Author: seancxmao <seancxmao@gmail.com>

Closes #21113 from seancxmao/SPARK-13136.
2018-04-24 16:16:07 +08:00
Marco Gaido 281c1ca0dc [SPARK-23973][SQL] Remove consecutive Sorts
## What changes were proposed in this pull request?

In SPARK-23375 we introduced the ability of removing `Sort` operation during query optimization if the data is already sorted. In this follow-up we remove also a `Sort` which is followed by another `Sort`: in this case the first sort is not needed and can be safely removed.

The PR starts from henryr's comment: https://github.com/apache/spark/pull/20560#discussion_r180601594. So credit should be given to him.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21072 from mgaido91/SPARK-23973.
2018-04-24 10:11:09 +08:00
Tathagata Das 770add81c3 [SPARK-23004][SS] Ensure StateStore.commit is called only once in a streaming aggregation task
## What changes were proposed in this pull request?

A structured streaming query with a streaming aggregation can throw the following error in rare cases. 

```
java.lang.IllegalStateException: Cannot commit after already committed or aborted
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:643)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:135)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2$$anonfun$hasNext$2.apply$mcV$sp(statefulOperators.scala:359)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:102)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:251)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2.hasNext(statefulOperators.scala:359)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:188)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:42)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336)
```

This can happen when the following conditions are accidentally hit. 
 - Streaming aggregation with aggregation function that is a subset of [`TypedImperativeAggregation`](76b8b840dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala (L473)) (for example, `collect_set`, `collect_list`, `percentile`, etc.). 
 - Query running in `update}` mode
 - After the shuffle, a partition has exactly 128 records. 

This causes StateStore.commit to be called twice. See the [JIRA](https://issues.apache.org/jira/browse/SPARK-23004) for a more detailed explanation. The solution is to use `NextIterator` or `CompletionIterator`, each of which has a flag to prevent the "onCompletion" task from being called more than once. In this PR, I chose to implement using `NextIterator`.

## How was this patch tested?

Added unit test that I have confirm will fail without the fix.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21124 from tdas/SPARK-23004.
2018-04-23 13:20:32 -07:00
Takeshi Yamamuro afbdf42730 [SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `ExternalMapToCatalyst`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20980 from maropu/SPARK-23589.
2018-04-23 14:28:28 +02:00
Wenchen Fan d87d30e4fe [SPARK-23564][SQL] infer additional filters from constraints for join's children
## What changes were proposed in this pull request?

The existing query constraints framework has 2 steps:
1. propagate constraints bottom up.
2. use constraints to infer additional filters for better data pruning.

For step 2, it mostly helps with Join, because we can connect the constraints from children to the join condition and infer powerful filters to prune the data of the join sides. e.g., the left side has constraints `a = 1`, the join condition is `left.a = right.a`, then we can infer `right.a = 1` to the right side and prune the right side a lot.

However, the current logic of inferring filters from constraints for Join is pretty weak. It infers the filters from Join's constraints. Some joins like left semi/anti exclude output from right side and the right side constraints will be lost here.

This PR propose to check the left and right constraints individually, expand the constraints with join condition and add filters to children of join directly, instead of adding to the join condition.

This reverts https://github.com/apache/spark/pull/20670 , covers https://github.com/apache/spark/pull/20717 and https://github.com/apache/spark/pull/20816

This is inspired by the original PRs and the tests are all from these PRs. Thanks to the authors mgaido91 maryannxue KaiXinXiaoLei !

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21083 from cloud-fan/join.
2018-04-23 20:21:01 +08:00
Wenchen Fan f70f46d1e5 [SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the handling of Project and Filter over partitioned relation
## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21111 from cloud-fan/refactor.
2018-04-23 20:18:50 +08:00
Mykhailo Shtelma c48085aa91 [SPARK-23799][SQL] FilterEstimation.evaluateInSet produces devision by zero in a case of empty table with analyzed statistics
>What changes were proposed in this pull request?

During evaluation of IN conditions, if the source data frame, is represented by a plan, that uses hive table with columns, which were previously analysed, and the plan has conditions for these fields, that cannot be satisfied (which leads us to an empty data frame), FilterEstimation.evaluateInSet method produces NumberFormatException and ClassCastException.
In order to fix this bug, method FilterEstimation.evaluateInSet at first checks, if distinct count is not zero, and also checks if colStat.min and colStat.max  are defined, and only in this case proceeds with the calculation. If at least one of the conditions is not satisfied, zero is returned.

>How was this patch tested?

In order to test the PR two tests were implemented: one in FilterEstimationSuite, that tests the plan with the statistics that violates the conditions mentioned above,  and another one in StatisticsCollectionSuite, that test the whole process of analysis/optimisation of the query, that leads to the problems, mentioned in the first section.

Author: Mykhailo Shtelma <mykhailo.shtelma@bearingpoint.com>
Author: smikesh <mshtelma@gmail.com>

Closes #21052 from mshtelma/filter_estimation_evaluateInSet_Bugs.
2018-04-21 23:33:57 -07:00
gatorsmile 7bc853d089 [SPARK-24033][SQL] Fix Mismatched of Window Frame specifiedwindowframe(RowFrame, -1, -1)
## What changes were proposed in this pull request?

When the OffsetWindowFunction's frame is `UnaryMinus(Literal(1))` but the specified window frame has been simplified to `Literal(-1)` by some optimizer rules e.g., `ConstantFolding`. Thus, they do not match and cause the following error:
```
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, -1, -1) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at
```
## How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21115 from gatorsmile/fixLag.
2018-04-21 10:45:12 -07:00
Marcelo Vanzin 1d758dc73b Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"
This reverts commit 0c94e48bc5.
2018-04-20 10:23:01 -07:00
Takeshi Yamamuro 0dd97f6ea4 [SPARK-23595][SQL] ValidateExternalType should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `ValidateExternalType`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20757 from maropu/SPARK-23595.
2018-04-20 15:02:27 +02:00
Takeshi Yamamuro 074a7f9053 [SPARK-23588][SQL][FOLLOW-UP] Resolve a map builder method per execution in CatalystToExternalMap
## What changes were proposed in this pull request?
This pr is a follow-up pr of #20979 and fixes code to resolve a map builder method per execution instead of per row in `CatalystToExternalMap`.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21112 from maropu/SPARK-23588-FOLLOWUP.
2018-04-20 14:43:47 +02:00
mn-mikke e6b466084c [SPARK-23736][SQL] Extending the concat function to support array columns
## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
- Concat expression has been extended to support array columns
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
- typeCoercion/native/concat.sql

## Codegen examples
### Primitive-type elements
```
val df = Seq(
  (Seq(1 ,2), Seq(3, 4)),
  (Seq(1, 2, 3), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 070 */               project_numElements,
/* 071 */               4);
/* 072 */             if (project_size > 2147483632) {
/* 073 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_size +
/* 074 */                 " bytes of data due to exceeding the limit 2147483632 bytes" +
/* 075 */                 " for UnsafeArrayData.");
/* 076 */             }
/* 077 */
/* 078 */             byte[] project_array = new byte[(int)project_size];
/* 079 */             UnsafeArrayData project_arrayData = new UnsafeArrayData();
/* 080 */             Platform.putLong(project_array, 16, project_numElements);
/* 081 */             project_arrayData.pointTo(project_array, 16, (int)project_size);
/* 082 */             int project_counter = 0;
/* 083 */             for (int y = 0; y < 2; y++) {
/* 084 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 085 */                 if (args[y].isNullAt(z)) {
/* 086 */                   project_arrayData.setNullAt(project_counter);
/* 087 */                 } else {
/* 088 */                   project_arrayData.setInt(
/* 089 */                     project_counter,
/* 090 */                     args[y].getInt(z)
/* 091 */                   );
/* 092 */                 }
/* 093 */                 project_counter++;
/* 094 */               }
/* 095 */             }
/* 096 */             return project_arrayData;
/* 097 */           }
/* 098 */         }.concat(project_args);
/* 099 */         boolean project_isNull = project_value == null;
```

### Non-primitive-type elements
```
val df = Seq(
  (Seq("aa" ,"bb"), Seq("ccc", "ddd")),
  (Seq("x", "y"), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             Object[] project_arrayObjects = new Object[(int)project_numElements];
/* 070 */             int project_counter = 0;
/* 071 */             for (int y = 0; y < 2; y++) {
/* 072 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 073 */                 project_arrayObjects[project_counter] = args[y].getUTF8String(z);
/* 074 */                 project_counter++;
/* 075 */               }
/* 076 */             }
/* 077 */             return new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObjects);
/* 078 */           }
/* 079 */         }.concat(project_args);
/* 080 */         boolean project_isNull = project_value == null;
```

Author: mn-mikke <mrkAha12346github>

Closes #20858 from mn-mikke/feature/array-api-concat_arrays-to-master.
2018-04-20 14:58:11 +09:00
Ryan Blue b3fde5a41e [SPARK-23877][SQL] Use filter predicates to prune partitions in metadata-only queries
## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

Author: Ryan Blue <blue@apache.org>

Closes #20988 from rdblue/SPARK-23877-metadata-only-push-filters.
2018-04-20 12:06:41 +08:00
“attilapiros” 9ea8d3d31b [SPARK-22362][SQL] Add unit test for Window Aggregate Functions
## What changes were proposed in this pull request?

Improving the test coverage of window functions focusing on missing test for window aggregate functions. No new UDAF test is added as it has been tested already.

## How was this patch tested?

Only new tests were added, automated tests were executed.

Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>

Closes #20046 from attilapiros/SPARK-22362.
2018-04-19 18:55:59 +02:00
Wenchen Fan 6e19f7683f [SPARK-23989][SQL] exchange should copy data before non-serialized shuffle
## What changes were proposed in this pull request?

In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place buffers non-serialized objects.

Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.

`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions` is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.

This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange.

TODO: test

## How was this patch tested?

todo.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21101 from cloud-fan/shuffle.
2018-04-19 17:54:53 +02:00
Xingbo Jiang d96c3e33cc [SPARK-21811][SQL] Fix the inconsistency behavior when finding the widest common type
## What changes were proposed in this pull request?

Currently we find the wider common type by comparing the two types from left to right, this can be a problem when you have two data types which don't have a common type but each can be promoted to StringType.

For instance, if you have a table with the schema:
[c1: date, c2: string, c3: int]

The following succeeds:
SELECT coalesce(c1, c2, c3) FROM table

While the following produces an exception:
SELECT coalesce(c1, c3, c2) FROM table

This is only a issue when the seq of dataTypes contains `StringType` and all the types can do string promotion.

close #19033

## How was this patch tested?

Add test in `TypeCoercionSuite`

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21074 from jiangxb1987/typeCoercion.
2018-04-19 21:21:22 +08:00
jinxing 9e10f69df5 [SPARK-22676][FOLLOW-UP] fix code style for test.
## What changes were proposed in this pull request?

This pr address comments in https://github.com/apache/spark/pull/19868 ;
Fix the code style for `org.apache.spark.sql.hive.QueryPartitionSuite` by using:
`withTempView`, `withTempDir`, `withTable`...

Author: jinxing <jinxing6042@126.com>

Closes #21091 from jinxing64/SPARK-22676-FOLLOW-UP.
2018-04-19 21:07:21 +08:00
Takeshi Yamamuro e13416502f [SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `CatalystToExternalMap`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20979 from maropu/SPARK-23588.
2018-04-19 14:42:50 +02:00
Takeshi Yamamuro 1b08c4393c [SPARK-23584][SQL] NewInstance should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `NewInstance`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20778 from maropu/SPARK-23584.
2018-04-19 14:38:26 +02:00
Kazuaki Ishizaki 46bb2b5129 [SPARK-23924][SQL] Add element_at function
## What changes were proposed in this pull request?

The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.

This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21053 from kiszk/SPARK-23924.
2018-04-19 21:00:10 +09:00
Kazuaki Ishizaki d5bec48b9c [SPARK-23919][SQL] Add array_position function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.

The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21037 from kiszk/SPARK-23919.
2018-04-19 11:59:17 +09:00
Gabor Somogyi 0c94e48bc5 [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait.

This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20888 from gaborgsomogyi/SPARK-23775.
2018-04-18 16:37:41 -07:00
Liang-Chi Hsieh a9066478f6 [SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData
## What changes were proposed in this pull request?

Use specified accessor in `ArrayData.foreach` and `toArray`.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21099 from viirya/SPARK-23875-followup.
2018-04-19 00:05:47 +02:00
Takuya UESHIN f09a9e9418 [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.
## What changes were proposed in this pull request?

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+

scala> df.filter("_1 <=> _2").show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+
```

The result should be empty but the result remains two rows.

## How was this patch tested?

Added a test.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe.
2018-04-18 08:22:05 -07:00
mn-mikke f81fa478ff [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments
## What changes were proposed in this pull request?

This PR extends `reverse` functions to be able to operate over array columns and covers:
- Introduction of `Reverse` expression that represents logic for reversing arrays and also strings
- Removal of `StringReverse` expression
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = inputadapter_value.copy();
/* 051 */           for(int k = 0; k < project_length / 2; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             boolean isNullAtK = project_value.isNullAt(k);
/* 054 */             boolean isNullAtL = project_value.isNullAt(l);
/* 055 */             if(!isNullAtK) {
/* 056 */               int el = project_value.getInt(k);
/* 057 */               if(!isNullAtL) {
/* 058 */                 project_value.setInt(k, project_value.getInt(l));
/* 059 */               } else {
/* 060 */                 project_value.setNullAt(k);
/* 061 */               }
/* 062 */               project_value.setInt(l, el);
/* 063 */             } else if (!isNullAtL) {
/* 064 */               project_value.setInt(k, project_value.getInt(l));
/* 065 */               project_value.setNullAt(l);
/* 066 */             }
/* 067 */           }
/* 068 */
/* 069 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */           for(int k = 0; k < project_length; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             project_value.update(k, inputadapter_value.getUTF8String(l));
/* 054 */           }
/* 055 */
/* 056 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #21034 from mn-mikke/feature/array-api-reverse-to-master.
2018-04-18 18:41:55 +09:00
gatorsmile cce469435d [SPARK-24002][SQL] Task not serializable caused by org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes
## What changes were proposed in this pull request?
```
Py4JJavaError: An error occurred while calling o153.sql.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
	at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:293)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:226)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Exception thrown in Future.get:
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
	at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
	...
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	... 23 more
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Task not serializable
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
	... 276 more
Caused by: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.nio.BufferUnderflowException
	at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
	at java.nio.ByteBuffer.get(ByteBuffer.java:715)
	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
```

The Parquet filters are serializable but not thread safe. SparkPlan.prepare() could be called in different threads (BroadcastExchange will call it in a thread pool). Thus, we could serialize the same Parquet filter at the same time. This is not easily reproduced. The fix is to avoid serializing these Parquet filters in the driver. This PR is to avoid serializing these Parquet filters by moving the parquet filter generation from the driver to executors.

## How was this patch tested?
Having two queries one is a 1000-line SQL query and a 3000-line SQL query. Need to run at least one hour with a heavy write workload to reproduce once.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21086 from gatorsmile/taskNotSerializable.
2018-04-17 21:03:57 -07:00
Wenchen Fan 310a8cd062 [SPARK-23341][SQL] define some standard options for data source v2
## What changes were proposed in this pull request?

Each data source implementation can define its own options and teach its users how to set them. Spark doesn't have any restrictions about what options a data source should or should not have. It's possible that some options are very common and many data sources use them. However different data sources may define the common options(key and meaning) differently, which is quite confusing to end users.

This PR defines some standard options that data sources can optionally adopt: path, table and database.

## How was this patch tested?

a new test case.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20535 from cloud-fan/options.
2018-04-18 11:51:10 +08:00
maryannxue 1e3b8762a8 [SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns
## What changes were proposed in this pull request?

Added `TransitPredicateInOuterJoin` optimization rule that transits constraints from the preserved side of an outer join to the null-supplying side. The constraints of the join operator will remain unchanged.

## How was this patch tested?

Added 3 tests in `InferFiltersFromConstraintsSuite`.

Author: maryannxue <maryann.xue@gmail.com>

Closes #20816 from maryannxue/spark-21479.
2018-04-18 10:36:41 +08:00
Marco Gaido f39e82ce15 [SPARK-23986][SQL] freshName can generate non-unique names
## What changes were proposed in this pull request?

We are using `CodegenContext.freshName` to get a unique name for any new variable we are adding. Unfortunately, this method currently fails to create a unique name when we request more than one instance of variables with starting name `name1` and an instance with starting name `name11`.

The PR changes the way a new name is generated by `CodegenContext.freshName` so that we generate unique names in this scenario too.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21080 from mgaido91/SPARK-23986.
2018-04-18 00:35:44 +08:00
jinxing ed4101d29f [SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true
## What changes were proposed in this pull request?

In current code, it will scanning all partition paths when spark.sql.hive.verifyPartitionPath=true.
e.g. table like below:
```
CREATE TABLE `test`(
`id` int,
`age` int,
`name` string)
PARTITIONED BY (
`A` string,
`B` string)
load data local inpath '/tmp/data0' into table test partition(A='00', B='00')
load data local inpath '/tmp/data1' into table test partition(A='01', B='01')
load data local inpath '/tmp/data2' into table test partition(A='10', B='10')
load data local inpath '/tmp/data3' into table test partition(A='11', B='11')
```
If I query with SQL – "select * from test where A='00' and B='01'  ", current code will scan all partition paths including '/data/A=00/B=00', '/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It costs much time and memory cost.

This pr proposes to avoid iterating all partition paths. Add a config `spark.files.ignoreMissingFiles` and ignore the `file not found` when `getPartitions/compute`(for hive table scan). This is much like the logic brought by
`spark.sql.files.ignoreMissingFiles`(which is for datasource scan).

## How was this patch tested?
UT

Author: jinxing <jinxing6042@126.com>

Closes #19868 from jinxing64/SPARK-22676.
2018-04-17 21:52:33 +08:00
Marco Gaido 0a9172a05e [SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization
## What changes were proposed in this pull request?

There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).

The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20976 from mgaido91/SPARK-23835.
2018-04-17 21:45:20 +08:00
Liang-Chi Hsieh 30ffb53cad [SPARK-23875][SQL] Add IndexedSeq wrapper for ArrayData
## What changes were proposed in this pull request?

We don't have a good way to sequentially access `UnsafeArrayData` with a common interface such as `Seq`. An example is `MapObject` where we need to access several sequence collection types together. But `UnsafeArrayData` doesn't implement `ArrayData.array`. Calling `toArray` will copy the entire array. We can provide an `IndexedSeq` wrapper for `ArrayData`, so we can avoid copying the entire array.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20984 from viirya/SPARK-23875.
2018-04-17 15:09:36 +02:00
Efim Poberezkin 05ae74778a [SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests
## What changes were proposed in this pull request?

Unit tests for EpochCoordinator that test correct sequencing of committed epochs. Several tests are ignored since they test functionality implemented in SPARK-23503 which is not yet merged, otherwise they fail.

Author: Efim Poberezkin <efim@poberezkin.ru>

Closes #20983 from efimpoberezkin/pr/EpochCoordinator-tests.
2018-04-17 04:13:17 -07:00
Jose Torres 1cc66a072b [SPARK-23687][SS] Add a memory source for continuous processing.
## What changes were proposed in this pull request?

Add a memory source for continuous processing.

Note that only one of the ContinuousSuite tests is migrated to minimize the diff here. I'll submit a second PR for SPARK-23688 to change the rest and get rid of waitForRateSourceTriggers.

## How was this patch tested?

unit test

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #20828 from jose-torres/continuousMemory.
2018-04-17 01:59:38 -07:00
Marco Gaido 14844a62c0 [SPARK-23918][SQL] Add array_min function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_min`. It takes an array as argument and returns the minimum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21025 from mgaido91/SPARK-23918.
2018-04-17 17:55:35 +09:00
Liang-Chi Hsieh fd990a908b [SPARK-23873][SQL] Use accessors in interpreted LambdaVariable
## What changes were proposed in this pull request?

Currently, interpreted execution of `LambdaVariable` just uses `InternalRow.get` to access element. We should use specified accessors if possible.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #20981 from viirya/SPARK-23873.
2018-04-16 22:45:57 +02:00
Marco Gaido 6931022031 [SPARK-23917][SQL] Add array_max function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_max`. It takes an array as argument and returns the maximum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21024 from mgaido91/SPARK-23917.
2018-04-15 21:45:55 -07:00
Liang-Chi Hsieh 73f28530d6 [SPARK-23979][SQL] MultiAlias should not be a CodegenFallback
## What changes were proposed in this pull request?

Just found `MultiAlias` is a `CodegenFallback`. It should not be as looks like `MultiAlias` won't be evaluated.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21065 from viirya/multialias-without-codegenfallback.
2018-04-14 08:59:04 +08:00
Tathagata Das cbb41a0c5b [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface
## What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly.

1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
  - Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation.
  - Writing a snapshot file - Same as above.

#### Current problems:
1. State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename.
1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized.

#### Solution:

1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically.
2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism.

This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation is preferred to make it work correctly with HDFS.

The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All users of this method need to either call `close()` to successfully write the file, or `cancel()` in case of an error.

## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21048 from tdas/SPARK-23966.
2018-04-13 16:31:39 -07:00
Bruce Robbins 558f31b31c [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table
## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the query increased.

I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins <bersprockets@gmail.com>

Closes #21043 from bersprockets/tabreadfix.
2018-04-13 14:05:04 -07:00
Marco Gaido 25892f3cc9 [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
## What changes were proposed in this pull request?

Added a new rule to remove Sort operation when its child is already sorted.
For instance, this simple code:
```
spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", "b").registerTempTable("table1")
val df = sql(s"""SELECT b
                | FROM (
                |     SELECT a, b
                |     FROM table1
                |     ORDER BY a
                | ) t
                | ORDER BY a""".stripMargin)
df.explain(true)
```
before the PR produces this plan:
```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
      +- 'Sort ['a ASC NULLS FIRST], true
         +- 'Project ['a, 'b]
            +- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- SubqueryAlias t
         +- Sort [a#6 ASC NULLS FIRST], true
            +- Project [a#6, b#7]
               +- SubqueryAlias table1
                  +- Project [_1#3 AS a#6, _2#4 AS b#7]
                     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
                        +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- Sort [a#6 ASC NULLS FIRST], true
         +- Project [_1#3 AS a#6, _2#4 AS b#7]
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
               +- ExternalRDD [obj#2]

== Physical Plan ==
*(3) Project [b#7]
+- *(3) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
      +- *(2) Project [b#7, a#6]
         +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
            +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
               +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
                     +- Scan ExternalRDDScan[obj#2]
```

while after the PR produces:

```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
      +- 'Sort ['a ASC NULLS FIRST], true
         +- 'Project ['a, 'b]
            +- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- SubqueryAlias t
         +- Sort [a#6 ASC NULLS FIRST], true
            +- Project [a#6, b#7]
               +- SubqueryAlias table1
                  +- Project [_1#3 AS a#6, _2#4 AS b#7]
                     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
                        +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [_1#3 AS a#6, _2#4 AS b#7]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
         +- ExternalRDD [obj#2]

== Physical Plan ==
*(2) Project [b#7]
+- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 5)
      +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
            +- Scan ExternalRDDScan[obj#2]
```

this means that an unnecessary sort operation is not performed after the PR.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20560 from mgaido91/SPARK-23375.
2018-04-14 01:01:00 +08:00
Gengliang Wang 4dfd746de3 [SPARK-23896][SQL] Improve PartitioningAwareFileIndex
## What changes were proposed in this pull request?

Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter.

However,
1. to get `userPartitionSchema`, we need to  combine inferred partition schema with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file index.

Only after that, a final version of `PartitioningAwareFileIndex` can be created.

This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`.

With the improvement, we can reduce redundant code and avoid parsing the file partition twice.
## How was this patch tested?
Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21004 from gengliangwang/PartitioningAwareFileIndex.
2018-04-14 00:22:38 +08:00
yucai 0323e61465 [SPARK-23905][SQL] Add UDF weekday
## What changes were proposed in this pull request?

Add UDF weekday

## How was this patch tested?

A new test

Author: yucai <yyu1@ebay.com>

Closes #21009 from yucai/SPARK-23905.
2018-04-13 00:00:04 -07:00