Commit graph

6911 commits

Author SHA1 Message Date
Kazuaki Ishizaki 4446a0b0d9 [SPARK-23914][SQL][FOLLOW-UP] refactor ArrayUnion
## What changes were proposed in this pull request?

This PR refactors `ArrayUnion` based on [this suggestion](https://github.com/apache/spark/pull/21103#discussion_r205668821).
1. Generate optimized code for all of the primitive types except `boolean`
1. Generate code using `ArrayBuilder` or `ArrayBuffer`
1. Leave only a generic path in the interpreted path

## How was this patch tested?

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21937 from kiszk/SPARK-23914-follow.
2018-08-07 12:07:56 +09:00
Arun Mahadevan 18b6ec1471 [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress
## What changes were proposed in this pull request?

Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source).

Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further.

## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21721 from arunmahadevan/SPARK-24748.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-07 10:28:26 +08:00
Jungtaek Lim 6afe6f32ca [SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics
## What changes were proposed in this pull request?

The patch adds metrics regarding state and watermark to dropwizard metrics, so that watermark and state rows/size can be tracked via time-series manner.

## How was this patch tested?

Manually tested with CSV metric sink.

Closes #21622 from HeartSaVioR/SPARK-24637.

Authored-by: Jungtaek Lim <kabhwan@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-07 10:12:22 +08:00
Marco Gaido 0f3fa2f289 [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
## What changes were proposed in this pull request?

The PR refactors the aggregate expressions which were not using DSL in order to simplify them.

## How was this patch tested?

NA

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21970 from mgaido91/SPARK-24996.
2018-08-06 19:46:51 -04:00
Kazuaki Ishizaki 408a3ff2c4 [SPARK-25036][SQL] Should compare ExprValue.isNull with LiteralTrue/LiteralFalse
## What changes were proposed in this pull request?

This PR fixes a comparison of `ExprValue.isNull` with `String`. `ExprValue.isNull` should be compared with `LiteralTrue` or `LiteralFalse`.

This causes the following compilation error using scala-2.12 with sbt. In addition, this code may also generate incorrect code in Spark 2.3.

```
/home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:94: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely always compare unequal
[error] [warn]         if (eval.isNull != "true") {
[error] [warn]
[error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:126: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal
[error] [warn]              if (eval.isNull == "true") {
[error] [warn]
[error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:133: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal
[error] [warn]             if (eval.isNull == "true") {
[error] [warn]
[error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala:90: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal
[error] [warn]       if (inputs.map(_.isNull).forall(_ == "false")) {
[error] [warn]
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #22012 from kiszk/SPARK-25036a.
2018-08-06 19:43:21 -04:00
Jungtaek Lim 87ca7396c7
[SPARK-24161][SS] Enable debug package feature on structured streaming
## What changes were proposed in this pull request?

Currently, debug package has a implicit class "DebugQuery" which matches Dataset to provide debug features on Dataset class. It doesn't work with structured streaming: it requires query is already started, and the information can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" had to be placed to StreamingQuery whereas it already exists on Dataset.

This patch adds a new implicit class "DebugStreamQuery" which matches StreamingQuery to provide similar debug features on StreamingQuery class.

## How was this patch tested?

Added relevant unit tests.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21222 from HeartSaVioR/SPARK-24161.
2018-08-06 15:23:47 -07:00
Dongjoon Hyun 278984d5a5 [SPARK-25019][BUILD] Fix orc dependency to use the same exclusion rules
## What changes were proposed in this pull request?

During upgrading Apache ORC to 1.5.2 ([SPARK-24576](https://issues.apache.org/jira/browse/SPARK-24576)), `sql/core` module overrides the exclusion rules of parent pom file and it causes published `spark-sql_2.1X` artifacts have incomplete exclusion rules ([SPARK-25019](https://issues.apache.org/jira/browse/SPARK-25019)). This PR fixes it by moving the newly added exclusion rule to the parent pom. This also fixes the sbt build hack introduced at that time.

## How was this patch tested?

Pass the existing dependency check and the tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #22003 from dongjoon-hyun/SPARK-25019.
2018-08-06 12:00:39 -07:00
Kazuaki Ishizaki 1a5e460762 [SPARK-23913][SQL] Add array_intersect function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_intersect`. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in the intersection of array1 and array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21102 from kiszk/SPARK-23913.
2018-08-06 23:27:57 +09:00
Dilip Biswal c1760da5dd [SPARK-25025][SQL] Remove the default value of isAll in INTERSECT/EXCEPT
## What changes were proposed in this pull request?

Having the default value of isAll in the logical plan nodes INTERSECT/EXCEPT could introduce bugs when the callers are not aware of it. This PR removes the default value and makes caller explicitly specify them.

## How was this patch tested?
This is a refactoring change. Existing tests test the functionality already.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #22000 from dilipbiswal/SPARK-25025.
2018-08-06 06:56:36 -04:00
John Zhuge d063e3a478 [SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesceHints
## What changes were proposed in this pull request?

Follow up to fix an unmerged review comment.

## How was this patch tested?

Unit test ResolveHintsSuite.

Author: John Zhuge <jzhuge@apache.org>

Closes #21998 from jzhuge/SPARK-24940.
2018-08-06 06:41:55 -04:00
Wenchen Fan ac527b5205 [SPARK-24991][SQL] use InternalRow in DataSourceWriter
## What changes were proposed in this pull request?

A follow up of #21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21948 from cloud-fan/row-write.
2018-08-06 15:52:01 +08:00
Takuya UESHIN 327bb30075 [SPARK-23911][SQL] Add aggregate function.
## What changes were proposed in this pull request?

This pr adds `aggregate` function which applies a binary operator to an initial state and all elements in the array, and reduces this to a single state. The final state is converted into the final result by applying a finish function.

```sql
> SELECT aggregate(array(1, 2, 3), (acc, x) -> acc + x);
 6
> SELECT aggregate(array(1, 2, 3), (acc, x) -> acc + x, acc -> acc * 10);
 60
```

## How was this patch tested?

Added tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21982 from ueshin/issues/SPARK-23911/aggregate.
2018-08-05 08:58:35 +09:00
hyukjinkwon 55e3ae6930 [SPARK-25001][BUILD] Fix miscellaneous build warnings
## What changes were proposed in this pull request?

There are many warnings in the current build (for instance see https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/4734/console).

**common**:

```
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java:237: warning: [rawtypes] found raw type: LevelDBIterator
[warn]   void closeIterator(LevelDBIterator it) throws IOException {
[warn]                      ^

[warn]   missing type arguments for generic class LevelDBIterator<T>
[warn]   where T is a type-variable:
[warn]     T extends Object declared in class LevelDBIterator
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:151: warning: [deprecation] group() in AbstractBootstrap has been deprecated
[warn]     if (bootstrap != null && bootstrap.group() != null) {
[warn]                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:152: warning: [deprecation] group() in AbstractBootstrap has been deprecated
[warn]       bootstrap.group().shutdownGracefully();
[warn]                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:154: warning: [deprecation] childGroup() in ServerBootstrap has been deprecated
[warn]     if (bootstrap != null && bootstrap.childGroup() != null) {
[warn]                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:155: warning: [deprecation] childGroup() in ServerBootstrap has been deprecated
[warn]       bootstrap.childGroup().shutdownGracefully();
[warn]                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java:112: warning: [deprecation] PooledByteBufAllocator(boolean,int,int,int,int,int,int,int) in PooledByteBufAllocator has been deprecated
[warn]     return new PooledByteBufAllocator(
[warn]            ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:321: warning: [rawtypes] found raw type: Future
[warn]     public void operationComplete(Future future) throws Exception {
[warn]                                   ^

[warn]   missing type arguments for generic class Future<V>
[warn]   where V is a type-variable:
[warn]     V extends Object declared in interface Future
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java:215: warning: [rawtypes] found raw type: StreamInterceptor
[warn]           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
[warn]           ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java:215: warning: [rawtypes] found raw type: StreamInterceptor
[warn]           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
[warn]                                               ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java:215: warning: [unchecked] unchecked call to StreamInterceptor(MessageHandler<T>,String,long,StreamCallback) as a member of the raw type StreamInterceptor
[warn]           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
[warn]                                           ^

[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java:255: warning: [rawtypes] found raw type: StreamInterceptor
[warn]         StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
[warn]         ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java:255: warning: [rawtypes] found raw type: StreamInterceptor
[warn]         StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
[warn]                                             ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java:255: warning: [unchecked] unchecked call to StreamInterceptor(MessageHandler<T>,String,long,StreamCallback) as a member of the raw type StreamInterceptor
[warn]         StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
[warn]                                         ^

[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java:270: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]         region.transferTo(byteRawChannel, region.transfered());
[warn]                                                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java:304: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]         region.transferTo(byteChannel, region.transfered());
[warn]                                              ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java:119: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]       while (in.transfered() < in.count()) {
[warn]                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java:120: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]         in.transferTo(channel, in.transfered());
[warn]                                  ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java:80: warning: [static] static method should be qualified by type name, Murmur3_x86_32, instead of by an expression
[warn]     Assert.assertEquals(-300363099, hasher.hashUnsafeWords(bytes, offset, 16, 42));
[warn]                                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java:84: warning: [static] static method should be qualified by type name, Murmur3_x86_32, instead of by an expression
[warn]     Assert.assertEquals(-1210324667, hasher.hashUnsafeWords(bytes, offset, 16, 42));
[warn]                                            ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java:88: warning: [static] static method should be qualified by type name, Murmur3_x86_32, instead of by an expression
[warn]     Assert.assertEquals(-634919701, hasher.hashUnsafeWords(bytes, offset, 16, 42));
[warn]                                           ^
```

**launcher**:

```
[warn] Pruning sources from previous analysis, due to incompatible CompileSetup.
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java:31: warning: [rawtypes] found raw type: AbstractLauncher
[warn] public abstract class AbstractLauncher<T extends AbstractLauncher> {
[warn]                                                  ^
[warn]   missing type arguments for generic class AbstractLauncher<T>
[warn]   where T is a type-variable:
[warn]     T extends AbstractLauncher declared in class AbstractLauncher
```

**core**:

```
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala:99: method group in class AbstractBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]     if (bootstrap != null && bootstrap.group() != null) {
[warn]                                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala💯 method group in class AbstractBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]       bootstrap.group().shutdownGracefully()
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala:102: method childGroup in class ServerBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]     if (bootstrap != null && bootstrap.childGroup() != null) {
[warn]                                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala:103: method childGroup in class ServerBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]       bootstrap.childGroup().shutdownGracefully()
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala:151: reflective access of structural type member method getData should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn] This can be achieved by adding the import clause 'import scala.language.reflectiveCalls'
[warn] or by setting the compiler option -language:reflectiveCalls.
[warn] See the Scaladoc for value scala.language.reflectiveCalls for a discussion
[warn] why the feature should be explicitly enabled.
[warn]       val rdd = sc.parallelize(1 to 1).map(concreteObject.getData)
[warn]                                                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala:175: reflective access of structural type member value innerObject2 should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn]       val rdd = sc.parallelize(1 to 1).map(concreteObject.innerObject2.getData)
[warn]                                                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala:175: reflective access of structural type member method getData should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn]       val rdd = sc.parallelize(1 to 1).map(concreteObject.innerObject2.getData)
[warn]                                                                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/LocalSparkContext.scala:32: constructor Slf4JLoggerFactory in class Slf4JLoggerFactory is deprecated: see corresponding Javadoc for more information.
[warn]     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
[warn]                                             ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:218: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]         assert(wrapper.stageAttemptId === stages.head.attemptId)
[warn]                                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:261: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       stageAttemptId = stages.head.attemptId))
[warn]                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:287: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       stageAttemptId = stages.head.attemptId))
[warn]                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:471: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       stageAttemptId = stages.last.attemptId))
[warn]                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:966: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]     listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))
[warn]                                                                          ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:972: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]     listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
[warn]                                                                      ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:976: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       .taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))
[warn]                                             ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:1146: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
[warn]                                                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:1150: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
[warn]                                                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala:197: method transfered in trait FileRegion is deprecated: see corresponding Javadoc for more information.
[warn]     while (region.transfered() < region.count()) {
[warn]                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala:198: method transfered in trait FileRegion is deprecated: see corresponding Javadoc for more information.
[warn]       region.transferTo(byteChannel, region.transfered())
[warn]                                             ^
```

**sql**:

```
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:534: abstract type T is unchecked since it is eliminated by erasure
[warn]       assert(partitioning.isInstanceOf[T])
[warn]                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:534: abstract type T is unchecked since it is eliminated by erasure
[warn]       assert(partitioning.isInstanceOf[T])
[warn]             ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala:323: inferred existential type Option[Class[_$1]]( forSome { type _$1 }), which cannot be expressed by wildcards,  should be enabled
[warn] by making the implicit value scala.language.existentials visible.
[warn] This can be achieved by adding the import clause 'import scala.language.existentials'
[warn] or by setting the compiler option -language:existentials.
[warn] See the Scaladoc for value scala.language.existentials for a discussion
[warn] why the feature should be explicitly enabled.
[warn]       val optClass = Option(collectionCls)
[warn]                            ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:226: warning: [deprecation] ParquetFileReader(Configuration,FileMetaData,Path,List<BlockMetaData>,List<ColumnDescriptor>) in ParquetFileReader has been deprecated
[warn]     this.reader = new ParquetFileReader(
[warn]                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:178: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
[warn]                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:179: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64  &&
[warn]                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:181: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:182: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:183: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:198: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]         switch (descriptor.getType()) {
[warn]                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:221: warning: [deprecation] getTypeLength() in ColumnDescriptor has been deprecated
[warn]             readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength());
[warn]                                                                      ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:224: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             throw new IOException("Unsupported type: " + descriptor.getType());
[warn]                                                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:246: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]       descriptor.getType().toString(),
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:258: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]     switch (descriptor.getType()) {
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:384: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]         throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
[warn]                                                                                  ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java:458: warning: [static] static variable should be qualified by type name, BaseRepeatedValueVector, instead of by an expression
[warn]       int index = rowId * accessor.OFFSET_WIDTH;
[warn]                                   ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java:460: warning: [static] static variable should be qualified by type name, BaseRepeatedValueVector, instead of by an expression
[warn]       int end = offsets.getInt(index + accessor.OFFSET_WIDTH);
[warn]                                                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala:57: a pure expression does nothing in statement position; you may be omitting necessary parentheses
[warn]       case s => s
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala:182: inferred existential type org.apache.parquet.column.statistics.Statistics[?0]( forSome { type ?0 <: Comparable[?0] }), which cannot be expressed by wildcards,  should be enabled
[warn] by making the implicit value scala.language.existentials visible.
[warn] This can be achieved by adding the import clause 'import scala.language.existentials'
[warn] or by setting the compiler option -language:existentials.
[warn] See the Scaladoc for value scala.language.existentials for a discussion
[warn] why the feature should be explicitly enabled.
[warn]                 val columnStats = oneBlockColumnMeta.getStatistics
[warn]                                                      ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:146: implicit conversion method conv should be enabled
[warn] by making the implicit value scala.language.implicitConversions visible.
[warn] This can be achieved by adding the import clause 'import scala.language.implicitConversions'
[warn] or by setting the compiler option -language:implicitConversions.
[warn] See the Scaladoc for value scala.language.implicitConversions for a discussion
[warn] why the feature should be explicitly enabled.
[warn]     implicit def conv(x: (Int, Long)): KV = KV(x._1, x._2)
[warn]                  ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala:48: implicit conversion method unsafeRow should be enabled
[warn] by making the implicit value scala.language.implicitConversions visible.
[warn]   private implicit def unsafeRow(value: Int) = {
[warn]                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala:178: method getType in class ColumnDescriptor is deprecated: see corresponding Javadoc for more information.
[warn]                 assert(oneFooter.getFileMetaData.getSchema.getColumns.get(0).getType() ===
[warn]                                                                              ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala:154: method readAllFootersInParallel in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
[warn]     ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java:679: warning: [cast] redundant cast to Complex
[warn]     Complex typedOther = (Complex)other;
[warn]                          ^
```

**mllib**:

```
[warn] Pruning sources from previous analysis, due to incompatible CompileSetup.
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala:597: match may not be exhaustive.
[warn] It would fail on the following inputs: None, Some((x: Tuple2[?, ?] forSome x not in (?, ?)))
[warn]     val df = dfs.find {
[warn]                       ^
```

This PR does not target fix all of them since some look pretty tricky to fix and there look too many warnings including false positive (like deprecated API but it's used in its test, etc.)

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21975 from HyukjinKwon/remove-build-warnings.
2018-08-04 11:52:49 -05:00
Wenchen Fan 684c719cc0 [SPARK-23915][SQL][FOLLOWUP] Add array_except function
## What changes were proposed in this pull request?

simplify the codegen:
1. only do real codegen if the type can be specialized by the hash set
2. change the null handling. Before: track the nullElementIndex, and create a new ArrayData to insert the null in the middle. After: track the nullElementIndex, put a null placeholder in the ArrayBuilder, at the end create ArrayData from ArrayBuilder directly.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21966 from cloud-fan/minor2.
2018-08-04 16:35:14 +09:00
Takuya UESHIN 0ecc132d6b [SPARK-23909][SQL] Add filter function.
## What changes were proposed in this pull request?

This pr adds `filter` function which filters the input array using the given predicate.

```sql
> SELECT filter(array(1, 2, 3), x -> x % 2 == 1);
 array(1, 3)
```

## How was this patch tested?

Added tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21965 from ueshin/issues/SPARK-23909/filter.
2018-08-04 16:08:53 +09:00
John Zhuge 36ea55e97e [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL Queries
## What changes were proposed in this pull request?

Many Spark SQL users in my company have asked for a way to control the number of output files in Spark SQL. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code. We propose adding the following Hive-style Coalesce and Repartition Hint to Spark SQL:
```
... SELECT /*+ COALESCE(numPartitions) */ ...
... SELECT /*+ REPARTITION(numPartitions) */ ...
```
Multiple such hints are allowed. Multiple nodes are inserted into the logical plan, and the optimizer will pick the leftmost hint.
```
INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t

== Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `s`, false, false
+- 'UnresolvedHint REPARTITION, [100]
   +- 'UnresolvedHint COALESCE, [500]
      +- 'UnresolvedHint COALESCE, [10]
         +- 'Project [*]
            +- 'UnresolvedRelation `t`

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand ...
+- Repartition 100, true
   +- HiveTableRelation ...
```

## How was this patch tested?

All unit tests. Manual tests using explain.

Author: John Zhuge <jzhuge@apache.org>

Closes #21911 from jzhuge/SPARK-24940.
2018-08-04 02:27:15 -04:00
Maxim Gekk 41c2227a23 [SPARK-24722][SQL] pivot() with Column type argument
## What changes were proposed in this pull request?

In the PR, I propose column-based API for the `pivot()` function. It allows using of any column expressions as the pivot column. Also this makes it consistent with how groupBy() works.

## How was this patch tested?

I added new tests to `DataFramePivotSuite` and updated PySpark examples for the `pivot()` function.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21699 from MaxGekk/pivot-column.
2018-08-04 14:17:32 +08:00
Dilip Biswal 19a4531913 [SPARK-24997][SQL] Enable support of MINUS ALL
## What changes were proposed in this pull request?
Enable support for MINUS ALL which was gated at AstBuilder.

## How was this patch tested?
Added tests in SQLQueryTestSuite and modify PlanParserSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21963 from dilipbiswal/minus-all.
2018-08-02 22:45:10 -07:00
Chris Horn b0d6967d45 [SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs should not fail
## What changes were proposed in this pull request?
In the current master, `toString` throws an exception when `RelationalGroupedDataset` has unresolved expressions;
```
scala> spark.range(0, 10).groupBy("id")
res4: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [id: bigint], value: [id: bigint], type: GroupBy]

scala> spark.range(0, 10).groupBy('id)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'id
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
  at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:474)
  at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:473)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.spark.sql.RelationalGroupedDataset.toString(RelationalGroupedDataset.scala:473)
  at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
```
This pr fixed code to handle the unresolved case in `RelationalGroupedDataset.toString`.

Closes #21752

## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.

Author: Chris Horn <chorn4033@gmail.com>
Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21964 from maropu/SPARK-24788.
2018-08-02 22:40:58 -07:00
Dilip Biswal 73dd6cf9b5 [SPARK-24966][SQL] Implement precedence rules for set operations.
## What changes were proposed in this pull request?

Currently the set operations INTERSECT, UNION and EXCEPT are assigned the same precedence. This PR fixes the problem by giving INTERSECT  higher precedence than UNION and EXCEPT. UNION and EXCEPT operators are evaluated in the order in which they appear in the query from left to right.

This results in change in behavior because of the change in order of evaluations of set operators in a query. The old behavior is still preserved under a newly added config parameter.

Query `:`
```
SELECT * FROM t1
UNION
SELECT * FROM t2
EXCEPT
SELECT * FROM t3
INTERSECT
SELECT * FROM t4
```
Parsed plan before the change `:`
```
== Parsed Logical Plan ==
'Intersect false
:- 'Except false
:  :- 'Distinct
:  :  +- 'Union
:  :     :- 'Project [*]
:  :     :  +- 'UnresolvedRelation `t1`
:  :     +- 'Project [*]
:  :        +- 'UnresolvedRelation `t2`
:  +- 'Project [*]
:     +- 'UnresolvedRelation `t3`
+- 'Project [*]
   +- 'UnresolvedRelation `t4`
```
Parsed plan after the change `:`
```
== Parsed Logical Plan ==
'Except false
:- 'Distinct
:  +- 'Union
:     :- 'Project [*]
:     :  +- 'UnresolvedRelation `t1`
:     +- 'Project [*]
:        +- 'UnresolvedRelation `t2`
+- 'Intersect false
   :- 'Project [*]
   :  +- 'UnresolvedRelation `t3`
   +- 'Project [*]
      +- 'UnresolvedRelation `t4`
```
## How was this patch tested?
Added tests in PlanParserSuite, SQLQueryTestSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21941 from dilipbiswal/SPARK-24966.
2018-08-02 22:04:17 -07:00
Maxim Gekk b3f2911eeb [SPARK-24945][SQL] Switching to uniVocity 2.7.3
## What changes were proposed in this pull request?

In the PR, I propose to upgrade uniVocity parser from **2.6.3** to **2.7.3**. The recent version includes a fix for the SPARK-24645 issue and has better performance.

Before changes:
```
Parsing quoted values:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
One quoted string                           33336 / 34122          0.0      666727.0       1.0X

Wide rows with 1000 columns:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 1000 columns                         90287 / 91713          0.0       90286.9       1.0X
Select 100 columns                          31826 / 36589          0.0       31826.4       2.8X
Select one column                           25738 / 25872          0.0       25737.9       3.5X
count()                                       6931 / 7269          0.1        6931.5      13.0X
```
after:
```
Parsing quoted values:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
One quoted string                           33411 / 33510          0.0      668211.4       1.0X

Wide rows with 1000 columns:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 1000 columns                         88028 / 89311          0.0       88028.1       1.0X
Select 100 columns                          29010 / 32755          0.0       29010.1       3.0X
Select one column                           22936 / 22953          0.0       22936.5       3.8X
count()                                       6657 / 6740          0.2        6656.6      13.5X
```
Closes #21892

## How was this patch tested?

It was tested by `CSVSuite` and `CSVBenchmarks`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21969 from MaxGekk/univocity-2_7_3.
2018-08-03 08:33:28 +08:00
Gengliang Wang 7cf16a7fa4 [SPARK-24773] Avro: support logical timestamp type with different precisions
## What changes were proposed in this pull request?

Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29

To specify the output timestamp type, use Dataframe option `outputTimestampType`  or SQL config `spark.sql.avro.outputTimestampType`.  The supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`

The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21935 from gengliangwang/avro_timestamp.
2018-08-03 08:32:08 +08:00
Kazuaki Ishizaki bbdcc3bf61 [SPARK-22219][SQL] Refactor code to get a value for "spark.sql.codegen.comments"
## What changes were proposed in this pull request?

This PR refactors code to get a value for "spark.sql.codegen.comments" by avoiding `SparkEnv.get.conf`. This PR uses `SQLConf.get.codegenComments` since `SQLConf.get` always returns an instance of `SQLConf`.

## How was this patch tested?

Added test case to `DebuggingSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19449 from kiszk/SPARK-22219.
2018-08-02 18:19:04 -05:00
Liang-Chi Hsieh d0bc3ed679
[SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query
## What changes were proposed in this pull request?

`Uuid`'s results depend on random seed given during analysis. Thus under streaming query, we will have the same uuids in each execution. This seems to be incorrect for streaming query execution.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21854 from viirya/uuid_in_streaming.
2018-08-02 15:35:46 -07:00
Takeshi Yamamuro efef55388f [SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused
## What changes were proposed in this pull request?
In the current master, `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes some duplicate exchange and the actual number of registered exchanges changes. Finally, the assertion in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges become different;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201

This pr fixed the issue and the code to reproduce this is as follows;
```
scala> sql("SET spark.sql.adaptive.enabled=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
scala> val df = spark.range(1).selectExpr("id AS key", "id AS value")
scala> val resultDf = df.join(df, "key").join(df, "key")
scala> resultDf.show
...
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 101 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
...
```

## How was this patch tested?
Added tests in `ExchangeCoordinatorSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21754 from maropu/SPARK-24705-2.
2018-08-02 13:05:36 -07:00
Takuya UESHIN 02f967795b [SPARK-23908][SQL] Add transform function.
## What changes were proposed in this pull request?

This pr adds `transform` function which transforms elements in an array using the function.
Optionally we can take the index of each element as the second argument.

```sql
> SELECT transform(array(1, 2, 3), x -> x + 1);
 array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
 array(1, 3, 5)
```

## How was this patch tested?

Added tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21954 from ueshin/issues/SPARK-23908/transform.
2018-08-02 13:00:33 -07:00
Wenchen Fan f04cd67094 [MINOR] remove dead code in ExpressionEvalHelper
## What changes were proposed in this pull request?

This addresses https://github.com/apache/spark/pull/21236/files#r207078480

both https://github.com/apache/spark/pull/21236 and https://github.com/apache/spark/pull/21838 add a InternalRow result check to ExpressionEvalHelper and becomes duplicated.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21958 from cloud-fan/minor.
2018-08-02 09:26:27 -05:00
Kaya Kupferschmidt 7be6fc3c77 [SPARK-24742] Fix NullPointerexception in Field Metadata
## What changes were proposed in this pull request?

This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull"

## How was this patch tested?

A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala

Author: Kaya Kupferschmidt <k.kupferschmidt@dimajix.de>

Closes #21722 from kupferk/SPARK-24742.
2018-08-02 09:22:21 -05:00
Xiao Li 46110a589f [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalPlan Node
## What changes were proposed in this pull request?
Remove the AnalysisBarrier LogicalPlan node, which is useless now.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21962 from gatorsmile/refactor2.
2018-08-02 22:20:41 +08:00
Stavros Kontopoulos a65736996b [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support
## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21930 from skonto/scala2.12-sup.
2018-08-02 09:17:09 -05:00
Xiao Li 166f346185 [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
## What changes were proposed in this pull request?
This PR is to refactor the code in AVERAGE by dsl.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21951 from gatorsmile/refactor1.
2018-08-01 23:00:17 -07:00
Wenchen Fan ce084d3e06 [SPARK-24990][SQL] merge ReadSupport and ReadSupportWithSchema
## What changes were proposed in this pull request?

Regarding user-specified schema, data sources may have 3 different behaviors:
1. must have a user-specified schema
2. can't have a user-specified schema
3. can accept the user-specified if it's given, or infer the schema.

I added `ReadSupportWithSchema` to support these behaviors, following data source v1. But it turns out we don't need this extra interface. We can just add a `createReader(schema, options)` to `ReadSupport` and make it call `createReader(options)` by default.

TODO: also fix the streaming API in followup PRs.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21946 from cloud-fan/ds-schema.
2018-08-01 15:57:54 -07:00
Yuming Wang 9f558601e8 [SPARK-24937][SQL] Datasource partition table should load empty static partitions
## What changes were proposed in this pull request?

How to reproduce:
```sql
spark-sql> CREATE TABLE tbl AS SELECT 1;
spark-sql> CREATE TABLE tbl1 (c1 BIGINT, day STRING, hour STRING)
         > USING parquet
         > PARTITIONED BY (day, hour);
spark-sql> INSERT INTO TABLE tbl1 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0;
spark-sql> SHOW PARTITIONS tbl1;
spark-sql> CREATE TABLE tbl2 (c1 BIGINT)
         > PARTITIONED BY (day STRING, hour STRING);
spark-sql> INSERT INTO TABLE tbl2 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0;
spark-sql> SHOW PARTITIONS tbl2;
day=2018-07-25/hour=01
spark-sql>
```
1. Users will be confused about whether the partition data of `tbl1` is generated.
2. Inconsistent with Hive table behavior.

This pr fix this issues.

## How was this patch tested?

unit tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21883 from wangyum/SPARK-24937.
2018-08-01 13:58:29 -07:00
Kazuaki Ishizaki 95a9d5e3a5 [SPARK-23915][SQL] Add array_except function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_except`. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in array1 but not in array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21103 from kiszk/SPARK-23915.
2018-08-02 02:52:30 +08:00
Wenchen Fan defc54c69a [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
## What changes were proposed in this pull request?

This is a follow up of https://github.com/apache/spark/pull/21118 .

In https://github.com/apache/spark/pull/21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21921 from cloud-fan/row.
2018-08-01 21:39:35 +08:00
Reynold Xin 1efffb7993 [SPARK-24982][SQL] UDAF resolution should not throw AssertionError
## What changes were proposed in this pull request?
When user calls anUDAF with the wrong number of arguments, Spark previously throws an AssertionError, which is not supposed to be a user-facing exception.  This patch updates it to throw AnalysisException instead, so it is consistent with a regular UDF.

## How was this patch tested?
Updated test case udaf.sql.

Author: Reynold Xin <rxin@databricks.com>

Closes #21938 from rxin/SPARK-24982.
2018-08-01 00:15:31 -07:00
Reynold Xin 1f7e22c72c [SPARK-24951][SQL] Table valued functions should throw AnalysisException
## What changes were proposed in this pull request?
Previously TVF resolution could throw IllegalArgumentException if the data type is null type. This patch replaces that exception with AnalysisException, enriched with positional information, to improve error message reporting and to be more consistent with rest of Spark SQL.

## How was this patch tested?
Updated the test case in table-valued-functions.sql.out, which is how I identified this problem in the first place.

Author: Reynold Xin <rxin@databricks.com>

Closes #21934 from rxin/SPARK-24951.
2018-07-31 22:25:40 -07:00
DB Tsai 5f3441e542 [SPARK-24893][SQL] Remove the entire CaseWhen if all the outputs are semantic equivalence
## What changes were proposed in this pull request?

Similar to SPARK-24890, if all the outputs of `CaseWhen` are semantic equivalence, `CaseWhen` can be removed.

## How was this patch tested?

Tests added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21852 from dbtsai/short-circuit-when.
2018-08-01 10:31:02 +08:00
Mauro Palsgraaf 4ac2126bc6 [SPARK-24536] Validate that an evaluated limit clause cannot be null
## What changes were proposed in this pull request?

It proposes a version in which nullable expressions are not valid in the limit clause

## How was this patch tested?

It was tested with unit and e2e tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Mauro Palsgraaf <mauropalsgraaf@hotmail.com>

Closes #21807 from mauropalsgraaf/SPARK-24536.
2018-07-31 08:18:08 -07:00
maryannxue b4fd75fb9b [SPARK-24972][SQL] PivotFirst could not handle pivot columns of complex types
## What changes were proposed in this pull request?

When the pivot column is of a complex type, the eval() result will be an UnsafeRow, while the keys of the HashMap for column value matching is a GenericInternalRow. As a result, there will be no match and the result will always be empty.
So for a pivot column of complex-types, we should:
1) If the complex-type is not comparable (orderable), throw an Exception. It cannot be a pivot column.
2) Otherwise, if it goes through the `PivotFirst` code path, `PivotFirst` should use a TreeMap instead of HashMap for such columns.

This PR has also reverted the walk-around in Analyzer that had been introduced to avoid this `PivotFirst` issue.

## How was this patch tested?

Added UT.

Author: maryannxue <maryannxue@apache.org>

Closes #21926 from maryannxue/pivot_followup.
2018-07-30 23:43:53 -07:00
Maxim Gekk d20c10fdf3 [SPARK-24952][SQL] Support LZMA2 compression by Avro datasource
## What changes were proposed in this pull request?

In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource  in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.

## How was this patch tested?

It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21902 from MaxGekk/avro-xz-bzip2.
2018-07-31 09:12:57 +08:00
Reynold Xin abbb4ab4d8 [SPARK-24865][SQL] Remove AnalysisBarrier addendum
## What changes were proposed in this pull request?
I didn't want to pollute the diff in the previous PR and left some TODOs. This is a follow-up to address those TODOs.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #21896 from rxin/SPARK-24865-addendum.
2018-07-30 14:05:45 -07:00
Takeshi Yamamuro 47d84e4d0e [SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column
## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
 .option("partitionColumn", "text")
 .option("lowerBound", "aaa")
 .option("upperBound", "zzz")
 .option("numPartitions", 2)
 .jdbc("jdbc:postgresql:postgres", "t", options)

// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)

// without this pr
java.lang.NumberFormatException: For input string: "aaa"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```

Closes #19999

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21834 from maropu/SPARK-22814.
2018-07-30 07:42:00 -07:00
Gengliang Wang b90bfe3c42 [SPARK-24771][BUILD] Upgrade Apache AVRO to 1.8.2
## What changes were proposed in this pull request?

Upgrade Apache Avro from 1.7.7 to 1.8.2. The major new features:

1. More logical types. From the spec of 1.8.2 https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types we can see comparing to [1.7.7](https://avro.apache.org/docs/1.7.7/spec.html#Logical+Types), the new version support:
    - Date
    - Time (millisecond precision)
    - Time (microsecond precision)
    - Timestamp (millisecond precision)
    - Timestamp (microsecond precision)
    - Duration

2. Single-object encoding: https://avro.apache.org/docs/1.8.2/spec.html#single_object_encoding

This PR aims to update Apache Spark to support these new features.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21761 from gengliangwang/upgrade_avro_1.8.
2018-07-30 07:30:47 -07:00
Marco Gaido 85505fc8a5 [SPARK-24957][SQL] Average with decimal followed by aggregation returns wrong result
## What changes were proposed in this pull request?

When we do an average, the result is computed dividing the sum of the values by their count. In the case the result is a DecimalType, the way we are casting/managing the precision and scale is not really optimized and it is not coherent with what we do normally.

In particular, a problem can happen when the `Divide` operand returns a result which contains a precision and scale different by the ones which are expected as output of the `Divide` operand. In the case reported in the JIRA, for instance, the result of the `Divide` operand is a `Decimal(38, 36)`, while the output data type for `Divide` is 38, 22. This is not an issue when the `Divide` is followed by a `CheckOverflow` or a `Cast` to the right data type, as these operations return a decimal with the defined precision and scale. Despite in the `Average` operator we do have a `Cast`, this may be bypassed if the result of `Divide` is the same type which it is casted to, hence the issue reported in the JIRA may arise.

The PR proposes to use the normal rules/handling of the arithmetic operators with Decimal data type, so we both reuse the existing code (having a single logic for operations between decimals) and we fix this problem as the result is always guarded by `CheckOverflow`.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21910 from mgaido91/SPARK-24957.
2018-07-30 20:53:45 +08:00
hyukjinkwon bfe60fcdb4 [SPARK-24934][SQL] Explicitly whitelist supported types in upper/lower bounds for in-memory partition pruning
## What changes were proposed in this pull request?

Looks we intentionally set `null` for upper/lower bounds for complex types and don't use it. However, these look used in in-memory partition pruning, which ends up with incorrect results.

This PR proposes to explicitly whitelist the supported types.

```scala
val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
df.cache().filter("arrayCol > array('a', 'b')").show()
```

```scala
val df = sql("select cast('a' as binary) as a")
df.cache().filter("a == cast('a' as binary)").show()
```

**Before:**

```
+--------+
|arrayCol|
+--------+
+--------+
```

```
+---+
|  a|
+---+
+---+
```

**After:**

```
+--------+
|arrayCol|
+--------+
|  [c, d]|
+--------+
```

```
+----+
|   a|
+----+
|[61]|
+----+
```

## How was this patch tested?

Unit tests were added and manually tested.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21882 from HyukjinKwon/stats-filter.
2018-07-30 13:20:03 +08:00
Dilip Biswal 65a4bc143a [SPARK-21274][SQL] Implement INTERSECT ALL clause
## What changes were proposed in this pull request?
Implements INTERSECT ALL clause through query rewrites using existing operators in Spark.  Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design.

Input Query
``` SQL
SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
```
Rewritten Query
```SQL
   SELECT c1
    FROM (
         SELECT replicate_row(min_count, c1)
         FROM (
              SELECT c1,
                     IF (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count
              FROM (
                   SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt
                   FROM (
                        SELECT c1, true as vcol1, null as vcol2 FROM ut1
                        UNION ALL
                        SELECT c1, null as vcol1, true as vcol2 FROM ut2
                        ) AS union_all
                   GROUP BY c1
                   HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
                  )
              )
          )
```

## How was this patch tested?
Added test cases in SQLQueryTestSuite, DataFrameSuite, SetOperationSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21886 from dilipbiswal/dkb_intersect_all_final.
2018-07-29 22:11:01 -07:00
hyukjinkwon 6690924c49 [MINOR] Avoid the 'latest' link that might vary per release in functions.scala's comment
## What changes were proposed in this pull request?

This PR propose to address https://github.com/apache/spark/pull/21318#discussion_r187843125 comment.

This is rather a nit but looks we better avoid to update the link for each release since it always points the latest (it doesn't look like worth enough updating release guide on the other hand as well).

## How was this patch tested?

N/A

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21907 from HyukjinKwon/minor-fix.
2018-07-30 10:02:29 +08:00
liulijia 2c54aae1bc [SPARK-24809][SQL] Serializing LongToUnsafeRowMap in executor may result in data error
When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <liutang123@yeah.net>

Closes #21772 from liutang123/SPARK-24809.
2018-07-29 13:13:00 -07:00
Chris Martin c5b8d54c61 [SPARK-24950][SQL] DateTimeUtilsSuite daysToMillis and millisToDays fails w/java 8 181-b13
## What changes were proposed in this pull request?

- Update DateTimeUtilsSuite so that when testing roundtripping in daysToMillis and millisToDays multiple skipdates can be specified.
- Updated test so that both new years eve 2014 and new years day 2015 are skipped for kiribati time zones.  This is necessary as java versions pre 181-b13 considered new years day 2015 to be skipped while susequent versions corrected this to new years eve.

## How was this patch tested?
Unit tests

Author: Chris Martin <chris@cmartinit.co.uk>

Closes #21901 from d80tb7/SPARK-24950_datetimeUtilsSuite_failures.
2018-07-28 10:40:10 -05:00
Li Jin e8752095a0 [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF and Scalar Pandas UDF
## What changes were proposed in this pull request?

This PR add supports for using mixed Python UDF and Scalar Pandas UDF, in the following two cases:

(1)
```
from pyspark.sql.functions import udf, pandas_udf

udf('int')
def f1(x):
    return x + 1

pandas_udf('int')
def f2(x):
    return x + 1

df = spark.range(0, 1).toDF('v') \
    .withColumn('foo', f1(col('v'))) \
    .withColumn('bar', f2(col('v')))

```

QueryPlan:
```
>>> df.explain(True)
== Parsed Logical Plan ==
'Project [v#2L, foo#5, f2('v) AS bar#9]
+- AnalysisBarrier
      +- Project [v#2L, f1(v#2L) AS foo#5]
         +- Project [id#0L AS v#2L]
            +- Range (0, 1, step=1, splits=Some(4))

== Analyzed Logical Plan ==
v: bigint, foo: int, bar: int
Project [v#2L, foo#5, f2(v#2L) AS bar#9]
+- Project [v#2L, f1(v#2L) AS foo#5]
   +- Project [id#0L AS v#2L]
      +- Range (0, 1, step=1, splits=Some(4))

== Optimized Logical Plan ==
Project [id#0L AS v#2L, f1(id#0L) AS foo#5, f2(id#0L) AS bar#9]
+- Range (0, 1, step=1, splits=Some(4))

== Physical Plan ==
*(2) Project [id#0L AS v#2L, pythonUDF0#13 AS foo#5, pythonUDF0#14 AS bar#9]
+- ArrowEvalPython [f2(id#0L)], [id#0L, pythonUDF0#13, pythonUDF0#14]
   +- BatchEvalPython [f1(id#0L)], [id#0L, pythonUDF0#13]
      +- *(1) Range (0, 1, step=1, splits=4)
```

(2)
```
from pyspark.sql.functions import udf, pandas_udf
udf('int')
def f1(x):
    return x + 1

pandas_udf('int')
def f2(x):
    return x + 1

df = spark.range(0, 1).toDF('v')
df = df.withColumn('foo', f2(f1(df['v'])))
```

QueryPlan:
```
>>> df.explain(True)
== Parsed Logical Plan ==
Project [v#21L, f2(f1(v#21L)) AS foo#46]
+- AnalysisBarrier
      +- Project [v#21L, f1(f2(v#21L)) AS foo#39]
         +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#32]
            +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#25]
               +- Project [id#19L AS v#21L]
                  +- Range (0, 1, step=1, splits=Some(4))

== Analyzed Logical Plan ==
v: bigint, foo: int
Project [v#21L, f2(f1(v#21L)) AS foo#46]
+- Project [v#21L, f1(f2(v#21L)) AS foo#39]
   +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#32]
      +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#25]
         +- Project [id#19L AS v#21L]
            +- Range (0, 1, step=1, splits=Some(4))

== Optimized Logical Plan ==
Project [id#19L AS v#21L, f2(f1(id#19L)) AS foo#46]
+- Range (0, 1, step=1, splits=Some(4))

== Physical Plan ==
*(2) Project [id#19L AS v#21L, pythonUDF0#50 AS foo#46]
+- ArrowEvalPython [f2(pythonUDF0#49)], [id#19L, pythonUDF0#49, pythonUDF0#50]
   +- BatchEvalPython [f1(id#19L)], [id#19L, pythonUDF0#49]
      +- *(1) Range (0, 1, step=1, splits=4)
```

## How was this patch tested?

New tests are added to BatchEvalPythonExecSuite and ScalarPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21650 from icexelloss/SPARK-24624-mix-udf.
2018-07-28 13:41:07 +08:00
Reynold Xin 6424b146c9 [MINOR] Update docs for functions.scala to make it clear not all the built-in functions are defined there
The title summarizes the change.

Author: Reynold Xin <rxin@databricks.com>

Closes #21318 from rxin/functions.
2018-07-27 17:24:55 -07:00
Reynold Xin 34ebcc6b52 [MINOR] Improve documentation for HiveStringType's
The diff should be self-explanatory.

Author: Reynold Xin <rxin@databricks.com>

Closes #21897 from rxin/hivestringtypedoc.
2018-07-27 15:34:06 -07:00
Dilip Biswal 10f1f19659 [SPARK-21274][SQL] Implement EXCEPT ALL clause.
## What changes were proposed in this pull request?
Implements EXCEPT ALL clause through query rewrites using existing operators in Spark. In this PR, an internal UDTF (replicate_rows) is added to aid in preserving duplicate rows. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design.

**Note** This proposed UDTF is kept as a internal function that is purely used to aid with this particular rewrite to give us flexibility to change to a more generalized UDTF in future.

Input Query
``` SQL
SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
```
Rewritten Query
```SQL
SELECT c1
    FROM (
     SELECT replicate_rows(sum_val, c1)
       FROM (
         SELECT c1, sum_val
           FROM (
             SELECT c1, sum(vcol) AS sum_val
               FROM (
                 SELECT 1L as vcol, c1 FROM ut1
                 UNION ALL
                 SELECT -1L as vcol, c1 FROM ut2
              ) AS union_all
            GROUP BY union_all.c1
          )
        WHERE sum_val > 0
       )
   )
```

## How was this patch tested?
Added test cases in SQLQueryTestSuite, DataFrameSuite and SetOperationSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21857 from dilipbiswal/dkb_except_all_final.
2018-07-27 13:47:33 -07:00
Maxim Gekk 0a0f68bae6 [SPARK-24881][SQL] New Avro option - compression
## What changes were proposed in this pull request?

In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.

Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.

## How was this patch tested?

I added new test which read meta info from written avro files and checks `avro.codec` property.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21837 from MaxGekk/avro-compression.
2018-07-28 00:11:32 +08:00
pkuwm ef6c8395c4 [SPARK-23928][SQL] Add shuffle collection function.
## What changes were proposed in this pull request?

This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.

## How was this patch tested?

New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.

Author: Takuya UESHIN <ueshin@databricks.com>
Author: pkuwm <ihuizhi.lu@gmail.com>

Closes #21802 from ueshin/issues/SPARK-23928/shuffle.
2018-07-27 23:02:48 +09:00
maryannxue 21fcac1645 [SPARK-24288][SQL] Add a JDBC Option to enable preventing predicate pushdown
## What changes were proposed in this pull request?

Add a JDBC Option "pushDownPredicate" (default `true`) to allow/disallow predicate push-down in JDBC data source.

## How was this patch tested?

Add a test in `JDBCSuite`

Author: maryannxue <maryannxue@apache.org>

Closes #21875 from maryannxue/spark-24288.
2018-07-26 23:47:32 -07:00
Reynold Xin e6e9031d7b [SPARK-24865] Remove AnalysisBarrier
## What changes were proposed in this pull request?
AnalysisBarrier was introduced in SPARK-20392 to improve analysis speed (don't re-analyze nodes that have already been analyzed).

Before AnalysisBarrier, we already had some infrastructure in place, with analysis specific functions (resolveOperators and resolveExpressions). These functions do not recursively traverse down subplans that are already analyzed (with a mutable boolean flag _analyzed). The issue with the old system was that developers started using transformDown, which does a top-down traversal of the plan tree, because there was not top-down resolution function, and as a result analyzer performance became pretty bad.

In order to fix the issue in SPARK-20392, AnalysisBarrier was introduced as a special node and for this special node, transform/transformUp/transformDown don't traverse down. However, the introduction of this special node caused a lot more troubles than it solves. This implicit node breaks assumptions and code in a few places, and it's hard to know when analysis barrier would exist, and when it wouldn't. Just a simple search of AnalysisBarrier in PR discussions demonstrates it is a source of bugs and additional complexity.

Instead, this pull request removes AnalysisBarrier and reverts back to the old approach. We added infrastructure in tests that fail explicitly if transform methods are used in the analyzer.

## How was this patch tested?
Added a test suite AnalysisHelperSuite for testing the resolve* methods and transform* methods.

Author: Reynold Xin <rxin@databricks.com>
Author: Xiao Li <gatorsmile@gmail.com>

Closes #21822 from rxin/SPARK-24865.
2018-07-27 14:29:05 +08:00
zuotingbing dc3713cca2 [SPARK-24829][STS] In Spark Thrift Server, CAST AS FLOAT inconsistent with spark-shell or spark-sql
## What changes were proposed in this pull request?

SELECT CAST('4.56' AS FLOAT)

the result is 4.559999942779541

![2018-07-18_110944](https://user-images.githubusercontent.com/24823338/42857199-7c6783da-8a7b-11e8-8c69-1e9302102525.png)

 it should be 4.56 as same as in spark-shell or spark-sql.

![2018-07-18_111111](https://user-images.githubusercontent.com/24823338/42857210-80c89e96-8a7b-11e8-9f8c-de1a79a73752.png)

## How was this patch tested?

add unit tests

Author: zuotingbing <zuo.tingbing9@zte.com.cn>

Closes #21789 from zuotingbing/SPARK-24829.
2018-07-27 13:27:17 +08:00
Gengliang Wang fa09d91925 [SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration
## What changes were proposed in this pull request?

In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.

Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21873 from gengliangwang/linterRule.
2018-07-26 16:50:59 -07:00
maryannxue 5ed7660d14 [SPARK-24802][SQL][FOLLOW-UP] Add a new config for Optimization Rule Exclusion
## What changes were proposed in this pull request?

This is an extension to the original PR, in which rule exclusion did not work for classes derived from Optimizer, e.g., SparkOptimizer.
To solve this issue, Optimizer and its derived classes will define/override `defaultBatches` and `nonExcludableRules` in order to define its default rule set as well as rules that cannot be excluded by the SQL config. In the meantime, Optimizer's `batches` method is dedicated to the rule exclusion logic and is defined "final".

## How was this patch tested?

Added UT.

Author: maryannxue <maryannxue@apache.org>

Closes #21876 from maryannxue/rule-exclusion.
2018-07-26 11:06:23 -07:00
Dongjoon Hyun 58353d7f4b [SPARK-24924][SQL] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?

This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.

## How was this patch tested?

Pass the newly added tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21878 from dongjoon-hyun/SPARK-24924.
2018-07-26 16:11:03 +08:00
Takuya UESHIN c9b233d414 [SPARK-24878][SQL] Fix reverse function for array type of primitive type containing null.
## What changes were proposed in this pull request?

If we use `reverse` function for array type of primitive type containing `null` and the child array is `UnsafeArrayData`, the function returns a wrong result because `UnsafeArrayData` doesn't define the behavior of re-assignment, especially we can't set a valid value after we set `null`.

## How was this patch tested?

Added some tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21830 from ueshin/issues/SPARK-24878/fix_reverse.
2018-07-26 15:06:13 +08:00
Xiao Li d2e7deb59f [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter
## What changes were proposed in this pull request?
```Scala
      val udf1 = udf({(x: Int, y: Int) => x + y})
      val df = spark.range(0, 3).toDF("a")
        .withColumn("b", udf1($"a", udf1($"a", lit(10))))
      df.cache()
      df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.

## How was this patch tested?
Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21821 from gatorsmile/testMaster22.
2018-07-25 17:22:37 -07:00
Koert Kuipers 17f469bc80 [SPARK-24860][SQL] Support setting of partitionOverWriteMode in output options for writing DataFrame
## What changes were proposed in this pull request?

Besides spark setting spark.sql.sources.partitionOverwriteMode also allow setting partitionOverWriteMode per write

## How was this patch tested?

Added unit test in InsertSuite

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Koert Kuipers <koert@tresata.com>

Closes #21818 from koertkuipers/feat-partition-overwrite-mode-per-write.
2018-07-25 13:06:03 -07:00
Maxim Gekk 2f77616e1d [SPARK-24849][SPARK-24911][SQL] Converting a value of StructType to a DDL string
## What changes were proposed in this pull request?

In the PR, I propose to extend the `StructType`/`StructField` classes by new method `toDDL` which converts a value of the `StructType`/`StructField` type to a string formatted in DDL style. The resulted string can be used in a table creation.

The `toDDL` method of `StructField` is reused in `SHOW CREATE TABLE`. In this way the PR fixes the bug of unquoted names of nested fields.

## How was this patch tested?

I add a test for checking the new method and 2 round trip tests: `fromDDL` -> `toDDL` and `toDDL` -> `fromDDL`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21803 from MaxGekk/to-ddl.
2018-07-25 11:09:12 -07:00
Yuming Wang 7a5fd4a91e [SPARK-18874][SQL][FOLLOW-UP] Improvement type mismatched message
## What changes were proposed in this pull request?
Improvement `IN` predicate type mismatched message:
```sql
Mismatched columns:
[(, t, 4, ., `, t, 4, a, `, :, d, o, u, b, l, e, ,,  , t, 5, ., `, t, 5, a, `, :, d, e, c, i, m, a, l, (, 1, 8, ,, 0, ), ), (, t, 4, ., `, t, 4, c, `, :, s, t, r, i, n, g, ,,  , t, 5, ., `, t, 5, c, `, :, b, i, g, i, n, t, )]
```
After this patch:
```sql
Mismatched columns:
[(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, t5.`t5c`:bigint)]
```

## How was this patch tested?

unit tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21863 from wangyum/SPARK-18874.
2018-07-24 23:59:13 -07:00
crafty-coder 78e0a725e0 [SPARK-19018][SQL] Add support for custom encoding on csv writer
## What changes were proposed in this pull request?

Add support for custom encoding on csv writer, see https://issues.apache.org/jira/browse/SPARK-19018

## How was this patch tested?

Added two unit tests in CSVSuite

Author: crafty-coder <carlospb86@gmail.com>
Author: Carlos <crafty-coder@users.noreply.github.com>

Closes #20949 from crafty-coder/master.
2018-07-25 14:17:20 +08:00
Dilip Biswal afb0627536 [SPARK-23957][SQL] Sorts in subqueries are redundant and can be removed
## What changes were proposed in this pull request?
Thanks to henryr for the original idea at https://github.com/apache/spark/pull/21049

Description from the original PR :
Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit).

This patch removes the top sort operators from the subquery plans.

This closes https://github.com/apache/spark/pull/21049.

## How was this patch tested?
Added test cases in SubquerySuite to cover in, exists and scalar subqueries.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21853 from dilipbiswal/SPARK-23957.
2018-07-24 20:46:27 -07:00
DB Tsai d4c3415894 [SPARK-24890][SQL] Short circuiting the if condition when trueValue and falseValue are the same
## What changes were proposed in this pull request?

When `trueValue` and `falseValue` are semantic equivalence, the condition expression in `if` can be removed to avoid extra computation in runtime.

## How was this patch tested?

Test added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21848 from dbtsai/short-circuit-if.
2018-07-24 20:21:11 -07:00
maryannxue c26b092169 [SPARK-24891][SQL] Fix HandleNullInputsForUDF rule
## What changes were proposed in this pull request?

The HandleNullInputsForUDF would always add a new `If` node every time it is applied. That would cause a difference between the same plan being analyzed once and being analyzed twice (or more), thus raising issues like plan not matched in the cache manager. The solution is to mark the arguments as null-checked, which is to add a "KnownNotNull" node above those arguments, when adding the UDF under an `If` node, because clearly the UDF will not be called when any of those arguments is null.

## How was this patch tested?

Add new tests under sql/UDFSuite and AnalysisSuite.

Author: maryannxue <maryannxue@apache.org>

Closes #21851 from maryannxue/spark-24891.
2018-07-24 19:35:34 -07:00
s71955 d4a277f0ce [SPARK-24812][SQL] Last Access Time in the table description is not valid
## What changes were proposed in this pull request?

Last Access Time will always displayed wrong date Thu Jan 01 05:30:00 IST 1970 when user run  DESC FORMATTED table command
In hive its displayed as "UNKNOWN" which makes more sense than displaying wrong date. seems to be a limitation as of now even from hive, better we can follow the hive behavior unless the limitation has been resolved from hive.

spark client output
![spark_desc table](https://user-images.githubusercontent.com/12999161/42753448-ddeea66a-88a5-11e8-94aa-ef8d017f94c5.png)

Hive client output
![hive_behaviour](https://user-images.githubusercontent.com/12999161/42753489-f4fd366e-88a5-11e8-83b0-0f3a53ce83dd.png)

## How was this patch tested?
UT has been added which makes sure that the wrong date "Thu Jan 01 05:30:00 IST 1970 "
shall not be added as value for the Last Access  property

Author: s71955 <sujithchacko.2010@gmail.com>

Closes #21775 from sujith71955/master_hive.
2018-07-24 11:31:27 -07:00
Ryan Blue 9d27541a85 [SPARK-23325] Use InternalRow when reading with DataSourceV2.
## What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

## How was this patch tested?

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
2018-07-24 10:46:36 -07:00
hyukjinkwon 3d5c61e5fd [SPARK-22499][FOLLOWUP][SQL] Reduce input string expressions for Least and Greatest to reduce time in its test
## What changes were proposed in this pull request?

It's minor and trivial but looks 2000 input is good enough to reproduce and test in SPARK-22499.

## How was this patch tested?

Manually brought the change and tested.

Locally tested:

Before: 3m 21s 288ms
After: 1m 29s 134ms

Given the latest successful build took:

```
ArithmeticExpressionSuite:
- SPARK-22499: Least and greatest should not generate codes beyond 64KB (7 minutes, 49 seconds)
```

I expect it's going to save 4ish mins.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21855 from HyukjinKwon/minor-fix-suite.
2018-07-24 19:51:09 +08:00
10129659 13a67b070d [SPARK-24870][SQL] Cache can't work normally if there are case letters in SQL
## What changes were proposed in this pull request?
Modified the canonicalized to not case-insensitive.
Before the PR, cache can't work normally if there are case letters in SQL,
for example:
     sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

    sql("select key, sum(case when Key > 0 then 1 else 0 end) as positiveNum " +
      "from src group by key").cache().createOrReplaceTempView("src_cache")
    sql(
      s"""select a.key
           from
           (select key from src_cache where positiveNum = 1)a
           left join
           (select key from src_cache )b
           on a.key=b.key
        """).explain

The physical plan of the sql is:
![image](https://user-images.githubusercontent.com/26834091/42979518-3decf0fa-8c05-11e8-9837-d5e4c334cb1f.png)

The subquery "select key from src_cache where positiveNum = 1" on the left of join can use the cache data, but the subquery "select key from src_cache" on the right of join cannot use the cache data.

## How was this patch tested?

new added test

Author: 10129659 <chen.yanshan@zte.com.cn>

Closes #21823 from eatoncys/canonicalized.
2018-07-23 23:05:08 -07:00
Yuanjian Li cfc3e1aaa4 [SPARK-24339][SQL] Prunes the unused columns from child of ScriptTransformation
## What changes were proposed in this pull request?

Modify the strategy in ColumnPruning to add a Project between ScriptTransformation and its child, this strategy can reduce the scan time especially in the scenario of the table has many columns.

## How was this patch tested?

Add UT in ColumnPruningSuite and ScriptTransformationSuite.

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #21839 from xuanyuanking/SPARK-24339.
2018-07-23 13:04:39 -07:00
Tathagata Das 61f0ca4f1c [SPARK-24699][SS] Make watermarks work with Trigger.Once by saving updated watermark to commit log
## What changes were proposed in this pull request?

Streaming queries with watermarks do not work with Trigger.Once because of the following.
- Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned
- In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere.

The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log.

## How was this patch tested?
new unit tests

Co-authored-by: Tathagata Das <tathagata.das1565gmail.com>
Co-authored-by: c-horn <chorn4033gmail.com>

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21746 from tdas/SPARK-24699.
2018-07-23 13:03:32 -07:00
Onur Satici 2edf17effd [SPARK-24850][SQL] fix str representation of CachedRDDBuilder
## What changes were proposed in this pull request?
As of https://github.com/apache/spark/pull/21018, InMemoryRelation includes its cacheBuilder when logging query plans. This PR changes the string representation of the CachedRDDBuilder to not include the cached spark plan.

## How was this patch tested?

spark-shell, query:
```
var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache()
0 to 1 foreach { _ =>
df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache()
}
df_cached.explain
```
as of master results in:
```
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
+- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
```
with this patch results in:
```
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
   +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
         +- *(2) Project [A#10, B#11, B#35, B#87]
            +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
               :- *(2) Filter isnotnull(A#10)
               :  +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
               :        +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :              +- *(2) Project [A#10, B#11, B#35]
               :                 +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
               :                    :- *(2) Filter isnotnull(A#10)
               :                    :  +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
               :                    :        +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :                    :              +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
               :                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
               :                       +- *(1) Filter isnotnull(A#34)
               :                          +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
               :                                +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :                                      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
                  +- *(1) Filter isnotnull(A#86)
                     +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
                           +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
                                 +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
```

Author: Onur Satici <osatici@palantir.com>

Closes #21805 from onursatici/os/inmemoryrelation-str.
2018-07-23 09:52:28 -07:00
maryannxue 434319e73f [SPARK-24802][SQL] Add a new config for Optimization Rule Exclusion
## What changes were proposed in this pull request?

Since Spark has provided fairly clear interfaces for adding user-defined optimization rules, it would be nice to have an easy-to-use interface for excluding an optimization rule from the Spark query optimizer as well.

This would make customizing Spark optimizer easier and sometimes could debugging issues too.

- Add a new config spark.sql.optimizer.excludedRules, with the value being a list of rule names separated by comma.
- Modify the current batches method to remove the excluded rules from the default batches. Log the rules that have been excluded.
- Split the existing default batches into "post-analysis batches" and "optimization batches" so that only rules in the "optimization batches" can be excluded.

## How was this patch tested?

Add a new test suite: OptimizerRuleExclusionSuite

Author: maryannxue <maryannxue@apache.org>

Closes #21764 from maryannxue/rule-exclusion.
2018-07-23 08:25:24 -07:00
SongYadong ab18b02e66 [SQL][HIVE] Correct an assert message in function makeRDDForTable
## What changes were proposed in this pull request?
according to the context, "makeRDDForTablePartitions" in assert message should be "makeRDDForPartitionedTable", because "makeRDDForTablePartitions" does't exist in spark code.

## How was this patch tested?
unit tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: SongYadong <song.yadong1@zte.com.cn>

Closes #21836 from SongYadong/assert_info_modify.
2018-07-23 19:10:53 +08:00
Gengliang Wang 8817c68f50 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.

I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/

Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.

## How was this patch tested?

Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21838 from gengliangwang/from_and_to_avro.
2018-07-22 17:36:57 -07:00
William Sheu bbd6f0c25f [SPARK-24879][SQL] Fix NPE in Hive partition pruning filter pushdown
## What changes were proposed in this pull request?
We get a NPE when we have a filter on a partition column of the form `col in (x, null)`. This is due to the filter converter in HiveShim not handling `null`s correctly. This patch fixes this bug while still pushing down as much of the partition pruning predicates as possible, by filtering out `null`s from any `in` predicate. Since Hive only supports very simple partition pruning filters, this change should preserve correctness.

## How was this patch tested?
Unit tests, manual tests

Author: William Sheu <william.sheu@databricks.com>

Closes #21832 from PenguinToast/partition-pruning-npe.
2018-07-20 19:59:28 -07:00
Brandon Krieger 597bdeff2d [SPARK-24488][SQL] Fix issue when generator is aliased multiple times
## What changes were proposed in this pull request?

Currently, the Analyzer throws an exception if your try to nest a generator. However, it special cases generators "nested" in an alias, and allows that. If you try to alias a generator twice, it is not caught by the special case, so an exception is thrown.

This PR trims the unnecessary, non-top-level aliases, so that the generator is allowed.

## How was this patch tested?

new tests in AnalysisSuite.

Author: Brandon Krieger <bkrieger@palantir.com>

Closes #21508 from bkrieger/bk/SPARK-24488.
2018-07-21 00:44:00 +02:00
Daniel van der Ende 2333a34d39 [SPARK-22880][SQL] Add cascadeTruncate option to JDBC datasource
This commit adds the `cascadeTruncate` option to the JDBC datasource
API, for databases that support this functionality (PostgreSQL and
Oracle at the moment). This allows for applying a cascading truncate
that affects tables that have foreign key constraints on the table
being truncated.

## What changes were proposed in this pull request?

Add `cascadeTruncate` option to JDBC datasource API. Allow this to affect the
`TRUNCATE` query for databases that support this option.

## How was this patch tested?
Existing tests for `truncateQuery` were updated. Also, an additional test was added
to ensure that the correct syntax was applied, and that enabling the config for databases
that do not support this option does not result in invalid queries.

Author: Daniel van der Ende <daniel.vanderende@gmail.com>

Closes #20057 from danielvdende/SPARK-22880.
2018-07-20 13:03:57 -07:00
Xiao Li 9ad77b3037 Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"
This reverts commit 244bcff194.
2018-07-20 12:55:38 -07:00
Gengliang Wang 244bcff194 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

Add a new function to_avro for converting a column into binary of avro format with the specified schema.

This PR is in progress. Will add test cases.
## How was this patch tested?

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21774 from gengliangwang/from_and_to_avro.
2018-07-20 09:19:29 -07:00
hyukjinkwon e0b6383218 [SPARK-23731][SQL] Make FileSourceScanExec canonicalizable after being (de)serialized
## What changes were proposed in this pull request?

### What's problem?

In some cases, sub scalar query could throw a NPE, which is caused in execution side.

```
java.lang.NullPointerException
	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### How does this happen?

Here looks what happen now:

1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`).

2. Try to extract some common expressions (via `CodeGenerator.subexpressionElimination`) so that it can generates some common codes and can be reused.

3. During this, seems it extracts some expressions that can be reused (via `EquivalentExpressions.addExprTree`)

  b2deef64f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (L1102)

4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) happened to be the same at `EquivalentExpressions.addExpr` anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in the same hash, which eventually calls `semanticEquals` in `ScalarSubquery`

  087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L54)

  087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L36)

5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult`

  77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L58)

6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls `FileSourceScanExec`'s `doCanonicalize`

  e008ad1752/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala (L258)

7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s `relation` is required but seems `transient` so it becomes `null`.

  e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L527)

  e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L160)

8. NPE is thrown.

\*1. driver side
\*2., 3., 4., 5., 6., 7., 8. executor side

Note that most of cases, it looks fine because we will usually call:

087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L40)

which make a canonicalized plan via:

b045315e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala (L192)

77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L52)

### How to reproduce?

This looks what happened now. I can reproduce this by a bit of messy way:

```diff
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804ce1e..d25fc9a7ba9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 -37,7 +37,9  class EquivalentExpressions {
       case _ => false
     }

-    override def hashCode: Int = e.semanticHash()
+    override def hashCode: Int = {
+      1
+    }
   }
```

```scala
spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
spark.conf.set("spark.sql.codegen.wholeStage", false)
sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()
```

### How does this PR fix?

- Make all variables that access to `FileSourceScanExec`'s `relation` as `lazy val` so that we avoid NPE. This is a temporary fix.

- Allow `makeCopy` in `SparkPlan` without Spark session too. This looks still able to be accessed within executor side. For instance:

  ```
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
	at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
  ```

This PR takes over https://github.com/apache/spark/pull/20856.

## How was this patch tested?

Manually tested and unit test was added.

Closes #20856

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21815 from HyukjinKwon/SPARK-23731.
2018-07-20 20:59:48 +08:00
Takuya UESHIN 7b6d36bc9e [SPARK-24871][SQL] Refactor Concat and MapConcat to avoid creating concatenator object for each row.
## What changes were proposed in this pull request?

Refactor `Concat` and `MapConcat` to:

- avoid creating concatenator object for each row.
- make `Concat` handle `containsNull` properly.
- make `Concat` shortcut if `null` child is found.

## How was this patch tested?

Added some tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21824 from ueshin/issues/SPARK-24871/refactor_concat_mapconcat.
2018-07-20 20:08:42 +08:00
Dilip Biswal 2b91d9918c [SPARK-24424][SQL] Support ANSI-SQL compliant syntax for GROUPING SET
## What changes were proposed in this pull request?

Enhances the parser and analyzer to support ANSI compliant syntax for GROUPING SET. As part of this change we derive the grouping expressions from user supplied groupings in the grouping sets clause.

```SQL
SELECT c1, c2, max(c3)
FROM t1
GROUP BY GROUPING SETS ((c1), (c1, c2))
```

## How was this patch tested?
Added tests in SQLQueryTestSuite and ResolveGroupingAnalyticsSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21813 from dilipbiswal/spark-24424.
2018-07-19 23:52:53 -07:00
Marco Gaido a5925c1631 [SPARK-24268][SQL] Use datatype.catalogString in error messages
## What changes were proposed in this pull request?

As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.

The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.

## How was this patch tested?

existing/modified UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21804 from mgaido91/SPARK-24268_catalog.
2018-07-19 23:29:29 -07:00
Wenchen Fan 1462b17666 [SPARK-24861][SS][TEST] create corrected temp directories in RateSourceSuite
## What changes were proposed in this pull request?

`RateSourceSuite` may leave garbage files under `sql/core/dummy`, we should use a corrected temp directory

## How was this patch tested?

test only

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21817 from cloud-fan/minor.
2018-07-20 13:40:26 +08:00
Ger van Rossum 67e108daa6 [SPARK-24846][SQL] Made hashCode ExprId independent of jvmId
## What changes were proposed in this pull request?
Made ExprId hashCode independent of jvmId to make canonicalization independent of JVM, by overriding hashCode (and necessarily also equality) to depend on id only

## How was this patch tested?
Created a unit test ExprIdSuite
Ran all unit tests of sql/catalyst

Author: Ger van Rossum <gvr@users.noreply.github.com>

Closes #21806 from gvr/spark24846-canonicalization.
2018-07-19 23:28:16 +02:00
Tathagata Das b3d88ac029 [SPARK-22187][SS] Update unsaferow format for saved state in flatMapGroupsWithState to allow timeouts with deleted state
## What changes were proposed in this pull request?

Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.

This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases. However, queries recovering from existing checkpoint will use the previous format to maintain compatibility with existing production queries.

## How was this patch tested?
Refactored existing end-to-end tests and added new tests for explicitly testing obj-to-row conversion for both state formats.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21739 from tdas/SPARK-22187-1.
2018-07-19 13:17:28 -07:00
Gengliang Wang 6a9a058e09 [SPARK-24858][SQL] Avoid unnecessary parquet footer reads
## What changes were proposed in this pull request?

Currently the same Parquet footer is read twice in the function `buildReaderWithPartitionValues` of ParquetFileFormat if filter push down is enabled.

Fix it with simple changes.
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21814 from gengliangwang/parquetFooter.
2018-07-19 22:24:53 +08:00
Jungtaek Lim 8b7d4f842f [SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider
## What changes were proposed in this pull request?

This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory.

## How was this patch tested?

Apply this patch on top of SPARK-24441 (https://github.com/apache/spark/pull/21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21700 from HeartSaVioR/SPARK-24717.
2018-07-19 00:07:35 -07:00
Wenchen Fan d05a926e78 [SPARK-24840][SQL] do not use dummy filter to switch codegen on/of
## What changes were proposed in this pull request?

It's a little tricky and fragile to use a dummy filter to switch codegen on/off. For now we should use local/cached relation to switch. In the future when we are able to use a config to turn off codegen, we shall use that.

## How was this patch tested?

test only PR.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21795 from cloud-fan/follow.
2018-07-19 11:54:41 +08:00
Sean Owen 753f115162 [SPARK-21261][DOCS][SQL] SQL Regex document fix
## What changes were proposed in this pull request?

Fix regexes in spark-sql command examples.
This takes over https://github.com/apache/spark/pull/18477

## How was this patch tested?

Existing tests. I verified the existing example doesn't work in spark-sql, but new ones does.

Author: Sean Owen <srowen@gmail.com>

Closes #21808 from srowen/SPARK-21261.
2018-07-18 18:39:23 -05:00
maryannxue cd203e0dfc [SPARK-24163][SPARK-24164][SQL] Support column list as the pivot column in Pivot
## What changes were proposed in this pull request?

1. Extend the Parser to enable parsing a column list as the pivot column.
2. Extend the Parser and the Pivot node to enable parsing complex expressions with aliases as the pivot value.
3. Add type check and constant check in Analyzer for Pivot node.

## How was this patch tested?

Add tests in pivot.sql

Author: maryannxue <maryannxue@apache.org>

Closes #21720 from maryannxue/spark-24164.
2018-07-18 13:33:26 -07:00
韩田田00222924 002300dd41 [SPARK-24804] There are duplicate words in the test title in the DatasetSuite
## What changes were proposed in this pull request?
In DatasetSuite.scala, in the 1299 line,
test("SPARK-19896: cannot have circular references in in case class") ,
there are  duplicate words "in in".  We can get rid of one.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: 韩田田00222924 <han.tiantian@zte.com.cn>

Closes #21767 from httfighter/inin.
2018-07-18 09:40:36 -05:00
Takuya UESHIN 34cb3b54e9 [SPARK-24386][SPARK-24768][BUILD][FOLLOWUP] Fix lint-java and Scala 2.12 build.
## What changes were proposed in this pull request?

This pr fixes lint-java and Scala 2.12 build.

lint-java:

```
[ERROR] src/test/resources/log4j.properties:[0] (misc) NewlineAtEndOfFile: File does not end with a newline.
```

Scala 2.12 build:

```
[error] /.../sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala:121: overloaded method value addTaskCompletionListener with alternatives:
[error]   (f: org.apache.spark.TaskContext => Unit)org.apache.spark.TaskContext <and>
[error]   (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error]  cannot be applied to (org.apache.spark.TaskContext => java.util.List[Runnable])
[error]       context.addTaskCompletionListener { ctx =>
[error]               ^
```

## How was this patch tested?

Manually executed lint-java and Scala 2.12 build in my local environment.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21801 from ueshin/issues/SPARK-24386_24768/fix_build.
2018-07-18 19:17:18 +08:00