Commit graph

21795 commits

Author SHA1 Message Date
Steve Loughran ce7ba2e98e [SPARK-23807][BUILD] Add Hadoop 3.1 profile with relevant POM fix ups
## What changes were proposed in this pull request?

1. Adds a `hadoop-3.1` profile build depending on the hadoop-3.1 artifacts.
1. In the hadoop-cloud module, adds an explicit hadoop-3.1 profile which switches from explicitly pulling in cloud connectors (hadoop-openstack, hadoop-aws, hadoop-azure) to depending on the hadoop-cloudstorage POM artifact, which pulls these in, has pre-excluded things like hadoop-common, and stays up to date with new connectors (hadoop-azuredatalake, hadoop-allyun). Goal: it becomes the Hadoop projects homework of keeping this clean, and the spark project doesn't need to handle new hadoop releases adding more dependencies.
1. the hadoop-cloud/hadoop-3.1 profile also declares support for jetty-ajax and jetty-util to ensure that these jars get into the distribution jar directory when needed by unshaded libraries.
1. Increases the curator and zookeeper versions to match those in hadoop-3, fixing spark core to build in sbt with the hadoop-3 dependencies.

## How was this patch tested?

* Everything this has been built and tested against both ASF Hadoop branch-3.1 and hadoop trunk.
* spark-shell was used to create connectors to all the stores and verify that file IO could take place.

The spark hive-1.2.1 JAR has problems here, as it's version check logic fails for Hadoop versions > 2.

This can be avoided with either of

* The hadoop JARs built to declare their version as Hadoop 2.11  `mvn install -DskipTests -DskipShade -Ddeclared.hadoop.version=2.11` . This is safe for local test runs, not for deployment (HDFS is very strict about cross-version deployment).
* A modified version of spark hive whose version check switch statement is happy with hadoop 3.

I've done both, with maven and SBT.

Three issues surfaced

1. A spark-core test failure —fixed in SPARK-23787.
1. SBT only: Zookeeper not being found in spark-core. Somehow curator 2.12.0 triggers some slightly different dependency resolution logic from previous versions, and Ivy was missing zookeeper.jar entirely. This patch adds the explicit declaration for all spark profiles, setting the ZK version = 3.4.9 for hadoop-3.1
1. Marking jetty-utils as provided in spark was stopping hadoop-azure from being able to instantiate the azure wasb:// client; it was using jetty-util-ajax, which could then not find a class in jetty-util.

Author: Steve Loughran <stevel@hortonworks.com>

Closes #20923 from steveloughran/cloud/SPARK-23807-hadoop-31.
2018-04-24 09:57:09 -07:00
Lu WANG 2a24c481da [SPARK-23975][ML] Allow Clustering to take Arrays of Double as input features
## What changes were proposed in this pull request?

- Multiple possible input types is added in validateAndTransformSchema() and computeCost() while checking column type

- Add if statement in transform() to support array type as featuresCol

- Add the case statement in fit() while selecting columns from dataset

These changes will be applied to KMeans first, then to other clustering method

## How was this patch tested?

unit test is added

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Lu WANG <lu.wang@databricks.com>

Closes #21081 from ludatabricks/SPARK-23975.
2018-04-24 09:25:41 -07:00
Julien Cuquemelle 55c4ca88a3 [SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation
## What changes were proposed in this pull request?

By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes the
latency of the job, with small tasks this setting can waste a lot of resources due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a ratio that will be used to reduce the number of
target executors w.r.t. full parallelism.

The number of executors computed with this setting is still fenced by
`spark.dynamicAllocation.maxExecutors` and `spark.dynamicAllocation.minExecutors`

## How was this patch tested?
Units tests and runs on various actual workloads on a Yarn Cluster

Author: Julien Cuquemelle <j.cuquemelle@criteo.com>

Closes #19881 from jcuquemelle/AddTaskPerExecutorSlot.
2018-04-24 10:56:55 -05:00
Takeshi Yamamuro 4926a7c2f0 [SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval
## What changes were proposed in this pull request?
This pr is a follow-up of #20980 and fixes code to reuse `InternalRow` for converting input keys/values in `ExternalMapToCatalyst` eval.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21137 from maropu/SPARK-23589-FOLLOWUP.
2018-04-24 17:52:05 +02:00
hyukjinkwon 87e8a572be [SPARK-24054][R] Add array_position function / element_at functions
## What changes were proposed in this pull request?

This PR proposes to add array_position and element_at in R side too.

array_position:

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_array(df$gear, df$am, df$carb))
head(select(mutated, array_position(mutated$v1, 1)))
```

```
  array_position(v1, 1.0)
1                       2
2                       2
3                       2
4                       3
5                       0
6                       3
```

element_at:

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
head(select(mutated, element_at(mutated$v1, 1)))
```

```
  element_at(v1, 1.0)
1                21.0
2                21.0
3                22.8
4                21.4
5                18.7
6                18.1
```

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_map(df$model, df$cyl))
head(select(mutated, element_at(mutated$v1, "Valiant")))
```

```
  element_at(v3, Valiant)
1                      NA
2                      NA
3                      NA
4                      NA
5                      NA
6                       6
```

## How was this patch tested?

Unit tests were added in `R/pkg/tests/fulltests/test_sparkSQL.R` and manually tested. Documentation was manually built and verified.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21130 from HyukjinKwon/sparkr_array_position_element_at.
2018-04-24 16:18:20 +08:00
seancxmao c303b1b676 [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId
## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: d28d5732ae/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala (L66)

## How was this patch tested?
N/A

Author: seancxmao <seancxmao@gmail.com>

Closes #21113 from seancxmao/SPARK-13136.
2018-04-24 16:16:07 +08:00
Marco Gaido 281c1ca0dc [SPARK-23973][SQL] Remove consecutive Sorts
## What changes were proposed in this pull request?

In SPARK-23375 we introduced the ability of removing `Sort` operation during query optimization if the data is already sorted. In this follow-up we remove also a `Sort` which is followed by another `Sort`: in this case the first sort is not needed and can be safely removed.

The PR starts from henryr's comment: https://github.com/apache/spark/pull/20560#discussion_r180601594. So credit should be given to him.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21072 from mgaido91/SPARK-23973.
2018-04-24 10:11:09 +08:00
Marcelo Vanzin 428b903859 [SPARK-24029][CORE] Follow up: set SO_REUSEADDR on the server socket.
"childOption" is for the remote connections, not for the server socket
that actually listens for incoming connections.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21132 from vanzin/SPARK-24029.2.
2018-04-24 09:10:29 +08:00
wuyi c8f3ac69d1 [SPARK-23888][CORE] correct the comment of hasAttemptOnHost()
TaskSetManager.hasAttemptOnHost had a misleading comment.  The comment
said that it only checked for running tasks, but really it checked for
any tasks that might have run in the past as well.  This updates to line
up with the implementation.

Author: wuyi <ngone_5451@163.com>

Closes #20998 from Ngone51/SPARK-23888.
2018-04-23 15:35:50 -05:00
Holden Karau e82cb68349
[SPARK-11237][ML] Add pmml export for k-means in Spark ML
## What changes were proposed in this pull request?

Adding PMML export to Spark ML's KMeans Model.

## How was this patch tested?

New unit test for Spark ML PMML export based on the old Spark MLlib unit test.

Author: Holden Karau <holden@pigscanfly.ca>

Closes #20907 from holdenk/SPARK-11237-Add-PMML-Export-for-KMeans.
2018-04-23 13:23:02 -07:00
Tathagata Das 770add81c3 [SPARK-23004][SS] Ensure StateStore.commit is called only once in a streaming aggregation task
## What changes were proposed in this pull request?

A structured streaming query with a streaming aggregation can throw the following error in rare cases. 

```
java.lang.IllegalStateException: Cannot commit after already committed or aborted
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:643)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:135)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2$$anonfun$hasNext$2.apply$mcV$sp(statefulOperators.scala:359)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:102)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:251)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2.hasNext(statefulOperators.scala:359)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:188)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:42)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336)
```

This can happen when the following conditions are accidentally hit. 
 - Streaming aggregation with aggregation function that is a subset of [`TypedImperativeAggregation`](76b8b840dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala (L473)) (for example, `collect_set`, `collect_list`, `percentile`, etc.). 
 - Query running in `update}` mode
 - After the shuffle, a partition has exactly 128 records. 

This causes StateStore.commit to be called twice. See the [JIRA](https://issues.apache.org/jira/browse/SPARK-23004) for a more detailed explanation. The solution is to use `NextIterator` or `CompletionIterator`, each of which has a flag to prevent the "onCompletion" task from being called more than once. In this PR, I chose to implement using `NextIterator`.

## How was this patch tested?

Added unit test that I have confirm will fail without the fix.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21124 from tdas/SPARK-23004.
2018-04-23 13:20:32 -07:00
liuzhaokun 448d248f89 [SPARK-21168] KafkaRDD should always set kafka clientId.
[https://issues.apache.org/jira/browse/SPARK-21168](https://issues.apache.org/jira/browse/SPARK-21168)
There are no a number of other places that a client ID should be set,and I think we should use consumer.clientId in the clientId method,because the fetch request  will be used by the same consumer behind.

Author: liuzhaokun <liu.zhaokun@zte.com.cn>

Closes #19887 from liu-zhaokun/master1205.
2018-04-23 13:56:11 -05:00
Teng Peng 293a0f29e3
[Spark-24024][ML] Fix poisson deviance calculations in GLM to handle y = 0
## What changes were proposed in this pull request?

It is reported by Spark users that the deviance calculation for poisson regression does not handle y = 0. Thus, the correct model summary cannot be obtained. The user has confirmed the the issue is in
```
override def deviance(y: Double, mu: Double, weight: Double): Double =
{ 2.0 * weight * (y * math.log(y / mu) - (y - mu)) }
when y = 0.
```

The user also mentioned there are many other places he believe we should check the same thing. However, no other changes are needed, including Gamma distribution.

## How was this patch tested?
Add a comparison with R deviance calculation to the existing unit test.

Author: Teng Peng <josephtengpeng@gmail.com>

Closes #21125 from tengpeng/Spark24024GLM.
2018-04-23 10:29:47 -07:00
Takeshi Yamamuro afbdf42730 [SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `ExternalMapToCatalyst`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20980 from maropu/SPARK-23589.
2018-04-23 14:28:28 +02:00
Wenchen Fan d87d30e4fe [SPARK-23564][SQL] infer additional filters from constraints for join's children
## What changes were proposed in this pull request?

The existing query constraints framework has 2 steps:
1. propagate constraints bottom up.
2. use constraints to infer additional filters for better data pruning.

For step 2, it mostly helps with Join, because we can connect the constraints from children to the join condition and infer powerful filters to prune the data of the join sides. e.g., the left side has constraints `a = 1`, the join condition is `left.a = right.a`, then we can infer `right.a = 1` to the right side and prune the right side a lot.

However, the current logic of inferring filters from constraints for Join is pretty weak. It infers the filters from Join's constraints. Some joins like left semi/anti exclude output from right side and the right side constraints will be lost here.

This PR propose to check the left and right constraints individually, expand the constraints with join condition and add filters to children of join directly, instead of adding to the join condition.

This reverts https://github.com/apache/spark/pull/20670 , covers https://github.com/apache/spark/pull/20717 and https://github.com/apache/spark/pull/20816

This is inspired by the original PRs and the tests are all from these PRs. Thanks to the authors mgaido91 maryannxue KaiXinXiaoLei !

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21083 from cloud-fan/join.
2018-04-23 20:21:01 +08:00
Wenchen Fan f70f46d1e5 [SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the handling of Project and Filter over partitioned relation
## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21111 from cloud-fan/refactor.
2018-04-23 20:18:50 +08:00
Kazuaki Ishizaki c3a86faa53 [SPARK-10399][SPARK-23879][FOLLOWUP][CORE] Free unused off-heap memory in MemoryBlockSuite
## What changes were proposed in this pull request?

As viirya pointed out [here](https://github.com/apache/spark/pull/19222#discussion_r179910484), this PR explicitly frees unused off-heap memory in `MemoryBlockSuite`

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21117 from kiszk/SPARK-10399-free-offheap.
2018-04-23 10:45:25 +08:00
Mykhailo Shtelma c48085aa91 [SPARK-23799][SQL] FilterEstimation.evaluateInSet produces devision by zero in a case of empty table with analyzed statistics
>What changes were proposed in this pull request?

During evaluation of IN conditions, if the source data frame, is represented by a plan, that uses hive table with columns, which were previously analysed, and the plan has conditions for these fields, that cannot be satisfied (which leads us to an empty data frame), FilterEstimation.evaluateInSet method produces NumberFormatException and ClassCastException.
In order to fix this bug, method FilterEstimation.evaluateInSet at first checks, if distinct count is not zero, and also checks if colStat.min and colStat.max  are defined, and only in this case proceeds with the calculation. If at least one of the conditions is not satisfied, zero is returned.

>How was this patch tested?

In order to test the PR two tests were implemented: one in FilterEstimationSuite, that tests the plan with the statistics that violates the conditions mentioned above,  and another one in StatisticsCollectionSuite, that test the whole process of analysis/optimisation of the query, that leads to the problems, mentioned in the first section.

Author: Mykhailo Shtelma <mykhailo.shtelma@bearingpoint.com>
Author: smikesh <mshtelma@gmail.com>

Closes #21052 from mshtelma/filter_estimation_evaluateInSet_Bugs.
2018-04-21 23:33:57 -07:00
gatorsmile 7bc853d089 [SPARK-24033][SQL] Fix Mismatched of Window Frame specifiedwindowframe(RowFrame, -1, -1)
## What changes were proposed in this pull request?

When the OffsetWindowFunction's frame is `UnaryMinus(Literal(1))` but the specified window frame has been simplified to `Literal(-1)` by some optimizer rules e.g., `ConstantFolding`. Thus, they do not match and cause the following error:
```
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, -1, -1) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at
```
## How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21115 from gatorsmile/fixLag.
2018-04-21 10:45:12 -07:00
Marcelo Vanzin 32b4bcd6d3 [SPARK-24029][CORE] Set SO_REUSEADDR on listen sockets.
This allows sockets to be bound even if there are sockets
from a previous application that are still pending closure. It
avoids bind issues when, for example, re-starting the SHS.

Don't enable the option on Windows though. The following page
explains some odd behavior that this option can have there:
https://msdn.microsoft.com/en-us/library/windows/desktop/ms740621%28v=vs.85%29.aspx

I intentionally ignored server sockets that always bind to
ephemeral ports, since those don't benefit from this option.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21110 from vanzin/SPARK-24029.
2018-04-21 23:14:58 +08:00
Marcelo Vanzin 1d758dc73b Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"
This reverts commit 0c94e48bc5.
2018-04-20 10:23:01 -07:00
Takeshi Yamamuro 0dd97f6ea4 [SPARK-23595][SQL] ValidateExternalType should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `ValidateExternalType`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20757 from maropu/SPARK-23595.
2018-04-20 15:02:27 +02:00
Takeshi Yamamuro 074a7f9053 [SPARK-23588][SQL][FOLLOW-UP] Resolve a map builder method per execution in CatalystToExternalMap
## What changes were proposed in this pull request?
This pr is a follow-up pr of #20979 and fixes code to resolve a map builder method per execution instead of per row in `CatalystToExternalMap`.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21112 from maropu/SPARK-23588-FOLLOWUP.
2018-04-20 14:43:47 +02:00
mn-mikke e6b466084c [SPARK-23736][SQL] Extending the concat function to support array columns
## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
- Concat expression has been extended to support array columns
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
- typeCoercion/native/concat.sql

## Codegen examples
### Primitive-type elements
```
val df = Seq(
  (Seq(1 ,2), Seq(3, 4)),
  (Seq(1, 2, 3), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 070 */               project_numElements,
/* 071 */               4);
/* 072 */             if (project_size > 2147483632) {
/* 073 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_size +
/* 074 */                 " bytes of data due to exceeding the limit 2147483632 bytes" +
/* 075 */                 " for UnsafeArrayData.");
/* 076 */             }
/* 077 */
/* 078 */             byte[] project_array = new byte[(int)project_size];
/* 079 */             UnsafeArrayData project_arrayData = new UnsafeArrayData();
/* 080 */             Platform.putLong(project_array, 16, project_numElements);
/* 081 */             project_arrayData.pointTo(project_array, 16, (int)project_size);
/* 082 */             int project_counter = 0;
/* 083 */             for (int y = 0; y < 2; y++) {
/* 084 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 085 */                 if (args[y].isNullAt(z)) {
/* 086 */                   project_arrayData.setNullAt(project_counter);
/* 087 */                 } else {
/* 088 */                   project_arrayData.setInt(
/* 089 */                     project_counter,
/* 090 */                     args[y].getInt(z)
/* 091 */                   );
/* 092 */                 }
/* 093 */                 project_counter++;
/* 094 */               }
/* 095 */             }
/* 096 */             return project_arrayData;
/* 097 */           }
/* 098 */         }.concat(project_args);
/* 099 */         boolean project_isNull = project_value == null;
```

### Non-primitive-type elements
```
val df = Seq(
  (Seq("aa" ,"bb"), Seq("ccc", "ddd")),
  (Seq("x", "y"), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             Object[] project_arrayObjects = new Object[(int)project_numElements];
/* 070 */             int project_counter = 0;
/* 071 */             for (int y = 0; y < 2; y++) {
/* 072 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 073 */                 project_arrayObjects[project_counter] = args[y].getUTF8String(z);
/* 074 */                 project_counter++;
/* 075 */               }
/* 076 */             }
/* 077 */             return new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObjects);
/* 078 */           }
/* 079 */         }.concat(project_args);
/* 080 */         boolean project_isNull = project_value == null;
```

Author: mn-mikke <mrkAha12346github>

Closes #20858 from mn-mikke/feature/array-api-concat_arrays-to-master.
2018-04-20 14:58:11 +09:00
Ryan Blue b3fde5a41e [SPARK-23877][SQL] Use filter predicates to prune partitions in metadata-only queries
## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

Author: Ryan Blue <blue@apache.org>

Closes #20988 from rdblue/SPARK-23877-metadata-only-push-filters.
2018-04-20 12:06:41 +08:00
Gabor Somogyi e55953b0bf [SPARK-24022][TEST] Make SparkContextSuite not flaky
## What changes were proposed in this pull request?

SparkContextSuite.test("Cancelling stages/jobs with custom reasons.") could stay in an infinite loop because of the problem found and fixed in [SPARK-23775](https://issues.apache.org/jira/browse/SPARK-23775).

This PR solves this mentioned flakyness by removing shared variable usages when cancel happens in a loop and using wait and CountDownLatch for synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #21105 from gaborgsomogyi/SPARK-24022.
2018-04-19 15:06:27 -07:00
“attilapiros” 9ea8d3d31b [SPARK-22362][SQL] Add unit test for Window Aggregate Functions
## What changes were proposed in this pull request?

Improving the test coverage of window functions focusing on missing test for window aggregate functions. No new UDAF test is added as it has been tested already.

## How was this patch tested?

Only new tests were added, automated tests were executed.

Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>

Closes #20046 from attilapiros/SPARK-22362.
2018-04-19 18:55:59 +02:00
wm624@hotmail.com a471880afb [SPARK-24026][ML] Add Power Iteration Clustering to spark.ml
## What changes were proposed in this pull request?

This PR adds PowerIterationClustering as a Transformer to spark.ml.  In the transform method, it calls spark.mllib's PowerIterationClustering.run() method and transforms the return value assignments (the Kmeans output of the pseudo-eigenvector) as a DataFrame (id: LongType, cluster: IntegerType).

This PR is copied and modified from https://github.com/apache/spark/pull/15770  The primary author is wangmiao1981

## How was this patch tested?

This PR has 2 types of tests:
* Copies of tests from spark.mllib's PIC tests
* New tests specific to the spark.ml APIs

Author: wm624@hotmail.com <wm624@hotmail.com>
Author: wangmiao1981 <wm624@hotmail.com>
Author: Joseph K. Bradley <joseph@databricks.com>

Closes #21090 from jkbradley/wangmiao1981-pic.
2018-04-19 09:40:20 -07:00
Wenchen Fan 6e19f7683f [SPARK-23989][SQL] exchange should copy data before non-serialized shuffle
## What changes were proposed in this pull request?

In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place buffers non-serialized objects.

Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.

`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions` is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.

This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange.

TODO: test

## How was this patch tested?

todo.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21101 from cloud-fan/shuffle.
2018-04-19 17:54:53 +02:00
wuyi 0deaa52513 [SPARK-24021][CORE] fix bug in BlacklistTracker's updateBlacklistForFetchFailure
## What changes were proposed in this pull request?

There‘s a miswrite in BlacklistTracker's updateBlacklistForFetchFailure:
```
val blacklistedExecsOnNode =
    nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
blacklistedExecsOnNode += exec
```
where first **exec** should be **host**.
## How was this patch tested?

adjust existed test.

Author: wuyi <ngone_5451@163.com>

Closes #21104 from Ngone51/SPARK-24021.
2018-04-19 09:00:33 -05:00
Xingbo Jiang d96c3e33cc [SPARK-21811][SQL] Fix the inconsistency behavior when finding the widest common type
## What changes were proposed in this pull request?

Currently we find the wider common type by comparing the two types from left to right, this can be a problem when you have two data types which don't have a common type but each can be promoted to StringType.

For instance, if you have a table with the schema:
[c1: date, c2: string, c3: int]

The following succeeds:
SELECT coalesce(c1, c2, c3) FROM table

While the following produces an exception:
SELECT coalesce(c1, c3, c2) FROM table

This is only a issue when the seq of dataTypes contains `StringType` and all the types can do string promotion.

close #19033

## How was this patch tested?

Add test in `TypeCoercionSuite`

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21074 from jiangxb1987/typeCoercion.
2018-04-19 21:21:22 +08:00
jinxing 9e10f69df5 [SPARK-22676][FOLLOW-UP] fix code style for test.
## What changes were proposed in this pull request?

This pr address comments in https://github.com/apache/spark/pull/19868 ;
Fix the code style for `org.apache.spark.sql.hive.QueryPartitionSuite` by using:
`withTempView`, `withTempDir`, `withTable`...

Author: jinxing <jinxing6042@126.com>

Closes #21091 from jinxing64/SPARK-22676-FOLLOW-UP.
2018-04-19 21:07:21 +08:00
Takeshi Yamamuro e13416502f [SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `CatalystToExternalMap`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20979 from maropu/SPARK-23588.
2018-04-19 14:42:50 +02:00
Takeshi Yamamuro 1b08c4393c [SPARK-23584][SQL] NewInstance should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `NewInstance`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20778 from maropu/SPARK-23584.
2018-04-19 14:38:26 +02:00
Kazuaki Ishizaki 46bb2b5129 [SPARK-23924][SQL] Add element_at function
## What changes were proposed in this pull request?

The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.

This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21053 from kiszk/SPARK-23924.
2018-04-19 21:00:10 +09:00
Kazuaki Ishizaki d5bec48b9c [SPARK-23919][SQL] Add array_position function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.

The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21037 from kiszk/SPARK-23919.
2018-04-19 11:59:17 +09:00
Liang-Chi Hsieh 8bb0df2c65 [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener
## What changes were proposed in this pull request?

The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it.

This patch also includes a trivial doc improvement for `createDirectStream`.

Original PR is #21057.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21098 from viirya/SPARK-24014.
2018-04-19 10:00:57 +08:00
Gabor Somogyi 0c94e48bc5 [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait.

This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20888 from gaborgsomogyi/SPARK-23775.
2018-04-18 16:37:41 -07:00
Liang-Chi Hsieh a9066478f6 [SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData
## What changes were proposed in this pull request?

Use specified accessor in `ArrayData.foreach` and `toArray`.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21099 from viirya/SPARK-23875-followup.
2018-04-19 00:05:47 +02:00
Takuya UESHIN f09a9e9418 [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.
## What changes were proposed in this pull request?

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+

scala> df.filter("_1 <=> _2").show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+
```

The result should be empty but the result remains two rows.

## How was this patch tested?

Added a test.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe.
2018-04-18 08:22:05 -07:00
mn-mikke f81fa478ff [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments
## What changes were proposed in this pull request?

This PR extends `reverse` functions to be able to operate over array columns and covers:
- Introduction of `Reverse` expression that represents logic for reversing arrays and also strings
- Removal of `StringReverse` expression
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = inputadapter_value.copy();
/* 051 */           for(int k = 0; k < project_length / 2; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             boolean isNullAtK = project_value.isNullAt(k);
/* 054 */             boolean isNullAtL = project_value.isNullAt(l);
/* 055 */             if(!isNullAtK) {
/* 056 */               int el = project_value.getInt(k);
/* 057 */               if(!isNullAtL) {
/* 058 */                 project_value.setInt(k, project_value.getInt(l));
/* 059 */               } else {
/* 060 */                 project_value.setNullAt(k);
/* 061 */               }
/* 062 */               project_value.setInt(l, el);
/* 063 */             } else if (!isNullAtL) {
/* 064 */               project_value.setInt(k, project_value.getInt(l));
/* 065 */               project_value.setNullAt(l);
/* 066 */             }
/* 067 */           }
/* 068 */
/* 069 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */           for(int k = 0; k < project_length; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             project_value.update(k, inputadapter_value.getUTF8String(l));
/* 054 */           }
/* 055 */
/* 056 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #21034 from mn-mikke/feature/array-api-reverse-to-master.
2018-04-18 18:41:55 +09:00
gatorsmile cce469435d [SPARK-24002][SQL] Task not serializable caused by org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes
## What changes were proposed in this pull request?
```
Py4JJavaError: An error occurred while calling o153.sql.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
	at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:293)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:226)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Exception thrown in Future.get:
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
	at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
	...
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	... 23 more
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Task not serializable
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
	... 276 more
Caused by: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.nio.BufferUnderflowException
	at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
	at java.nio.ByteBuffer.get(ByteBuffer.java:715)
	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
```

The Parquet filters are serializable but not thread safe. SparkPlan.prepare() could be called in different threads (BroadcastExchange will call it in a thread pool). Thus, we could serialize the same Parquet filter at the same time. This is not easily reproduced. The fix is to avoid serializing these Parquet filters in the driver. This PR is to avoid serializing these Parquet filters by moving the parquet filter generation from the driver to executors.

## How was this patch tested?
Having two queries one is a 1000-line SQL query and a 3000-line SQL query. Need to run at least one hour with a heavy write workload to reproduce once.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21086 from gatorsmile/taskNotSerializable.
2018-04-17 21:03:57 -07:00
Wenchen Fan 310a8cd062 [SPARK-23341][SQL] define some standard options for data source v2
## What changes were proposed in this pull request?

Each data source implementation can define its own options and teach its users how to set them. Spark doesn't have any restrictions about what options a data source should or should not have. It's possible that some options are very common and many data sources use them. However different data sources may define the common options(key and meaning) differently, which is quite confusing to end users.

This PR defines some standard options that data sources can optionally adopt: path, table and database.

## How was this patch tested?

a new test case.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20535 from cloud-fan/options.
2018-04-18 11:51:10 +08:00
maryannxue 1e3b8762a8 [SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns
## What changes were proposed in this pull request?

Added `TransitPredicateInOuterJoin` optimization rule that transits constraints from the preserved side of an outer join to the null-supplying side. The constraints of the join operator will remain unchanged.

## How was this patch tested?

Added 3 tests in `InferFiltersFromConstraintsSuite`.

Author: maryannxue <maryann.xue@gmail.com>

Closes #20816 from maryannxue/spark-21479.
2018-04-18 10:36:41 +08:00
jerryshao 5fccdae189 [SPARK-22968][DSTREAM] Throw an exception on partition revoking issue
## What changes were proposed in this pull request?

Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code makes sure there's no partition revoking scenarios, so trying to get latest offset from revoked partitions will throw exceptions as JIRA mentioned.

Partition revoking happens when new consumer joined the consumer group, which means different streaming apps are trying to use same group id. This is fundamentally not correct, different apps should use different consumer group. So instead of throwing an confused exception from Kafka, improve the exception message by identifying revoked partition and directly throw an meaningful exception when partition is revoked.

Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply cannot be worked without the fix.

```
8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, kssh-1] for group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group use_a_separate_group_id_for_each_stream with generation 4
18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group use_a_separate_group_id_for_each_stream
```

## How was this patch tested?

This is manually verified in local cluster, unfortunately I'm not sure how to simulate it in UT, so propose the PR without UT added.

Author: jerryshao <sshao@hortonworks.com>

Closes #21038 from jerryshao/SPARK-22968.
2018-04-17 21:08:42 -05:00
WeichenXu 1ca3c50fef [SPARK-21741][ML][PYSPARK] Python API for DataFrame-based multivariate summarizer
## What changes were proposed in this pull request?

Python API for DataFrame-based multivariate summarizer.

## How was this patch tested?

doctest added.

Author: WeichenXu <weichen.xu@databricks.com>

Closes #20695 from WeichenXu123/py_summarizer.
2018-04-17 10:11:08 -07:00
Marco Gaido f39e82ce15 [SPARK-23986][SQL] freshName can generate non-unique names
## What changes were proposed in this pull request?

We are using `CodegenContext.freshName` to get a unique name for any new variable we are adding. Unfortunately, this method currently fails to create a unique name when we request more than one instance of variables with starting name `name1` and an instance with starting name `name11`.

The PR changes the way a new name is generated by `CodegenContext.freshName` so that we generate unique names in this scenario too.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21080 from mgaido91/SPARK-23986.
2018-04-18 00:35:44 +08:00
jinxing 3990daaf3b [SPARK-23948] Trigger mapstage's job listener in submitMissingTasks
## What changes were proposed in this pull request?

SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`,
`markMapStageJobAsFinished` is called only in (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);

But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`;
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got resubmitted as stage0_1 and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but stage1 is not inside `runningStages`. So even though all splits(including the speculated tasks) in stage1 succeeded, job listener in stage1 will not be called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there is no missing tasks. But in current code, job listener is not triggered.

We should call the job listener for map stage in `5`.

## How was this patch tested?

Not added yet.

Author: jinxing <jinxing6042@126.com>

Closes #21019 from jinxing64/SPARK-23948.
2018-04-17 08:55:01 -05:00
jinxing ed4101d29f [SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true
## What changes were proposed in this pull request?

In current code, it will scanning all partition paths when spark.sql.hive.verifyPartitionPath=true.
e.g. table like below:
```
CREATE TABLE `test`(
`id` int,
`age` int,
`name` string)
PARTITIONED BY (
`A` string,
`B` string)
load data local inpath '/tmp/data0' into table test partition(A='00', B='00')
load data local inpath '/tmp/data1' into table test partition(A='01', B='01')
load data local inpath '/tmp/data2' into table test partition(A='10', B='10')
load data local inpath '/tmp/data3' into table test partition(A='11', B='11')
```
If I query with SQL – "select * from test where A='00' and B='01'  ", current code will scan all partition paths including '/data/A=00/B=00', '/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It costs much time and memory cost.

This pr proposes to avoid iterating all partition paths. Add a config `spark.files.ignoreMissingFiles` and ignore the `file not found` when `getPartitions/compute`(for hive table scan). This is much like the logic brought by
`spark.sql.files.ignoreMissingFiles`(which is for datasource scan).

## How was this patch tested?
UT

Author: jinxing <jinxing6042@126.com>

Closes #19868 from jinxing64/SPARK-22676.
2018-04-17 21:52:33 +08:00
Marco Gaido 0a9172a05e [SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization
## What changes were proposed in this pull request?

There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).

The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20976 from mgaido91/SPARK-23835.
2018-04-17 21:45:20 +08:00