Commit graph

22523 commits

Author SHA1 Message Date
Yuval Itzchakov b7fdf8eb20 [SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for TopicPartition
## What changes were proposed in this pull request?

This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak.

If accepted, this pull request should be merged into master as well.

## How was this patch tested?

Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`.

Author: Yuval Itzchakov <yuval.itzchakov@clicktale.com>

Closes #21997 from YuvalItzchakov/master.
2018-08-04 14:44:10 -05:00
hyukjinkwon 55e3ae6930 [SPARK-25001][BUILD] Fix miscellaneous build warnings
## What changes were proposed in this pull request?

There are many warnings in the current build (for instance see https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/4734/console).

**common**:

```
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java:237: warning: [rawtypes] found raw type: LevelDBIterator
[warn]   void closeIterator(LevelDBIterator it) throws IOException {
[warn]                      ^

[warn]   missing type arguments for generic class LevelDBIterator<T>
[warn]   where T is a type-variable:
[warn]     T extends Object declared in class LevelDBIterator
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:151: warning: [deprecation] group() in AbstractBootstrap has been deprecated
[warn]     if (bootstrap != null && bootstrap.group() != null) {
[warn]                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:152: warning: [deprecation] group() in AbstractBootstrap has been deprecated
[warn]       bootstrap.group().shutdownGracefully();
[warn]                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:154: warning: [deprecation] childGroup() in ServerBootstrap has been deprecated
[warn]     if (bootstrap != null && bootstrap.childGroup() != null) {
[warn]                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:155: warning: [deprecation] childGroup() in ServerBootstrap has been deprecated
[warn]       bootstrap.childGroup().shutdownGracefully();
[warn]                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java:112: warning: [deprecation] PooledByteBufAllocator(boolean,int,int,int,int,int,int,int) in PooledByteBufAllocator has been deprecated
[warn]     return new PooledByteBufAllocator(
[warn]            ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:321: warning: [rawtypes] found raw type: Future
[warn]     public void operationComplete(Future future) throws Exception {
[warn]                                   ^

[warn]   missing type arguments for generic class Future<V>
[warn]   where V is a type-variable:
[warn]     V extends Object declared in interface Future
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java:215: warning: [rawtypes] found raw type: StreamInterceptor
[warn]           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
[warn]           ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java:215: warning: [rawtypes] found raw type: StreamInterceptor
[warn]           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
[warn]                                               ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java:215: warning: [unchecked] unchecked call to StreamInterceptor(MessageHandler<T>,String,long,StreamCallback) as a member of the raw type StreamInterceptor
[warn]           StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
[warn]                                           ^

[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java:255: warning: [rawtypes] found raw type: StreamInterceptor
[warn]         StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
[warn]         ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java:255: warning: [rawtypes] found raw type: StreamInterceptor
[warn]         StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
[warn]                                             ^

[warn]   missing type arguments for generic class StreamInterceptor<T>
[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java:255: warning: [unchecked] unchecked call to StreamInterceptor(MessageHandler<T>,String,long,StreamCallback) as a member of the raw type StreamInterceptor
[warn]         StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
[warn]                                         ^

[warn]   where T is a type-variable:
[warn]     T extends Message declared in class StreamInterceptor
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java:270: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]         region.transferTo(byteRawChannel, region.transfered());
[warn]                                                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java:304: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]         region.transferTo(byteChannel, region.transfered());
[warn]                                              ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java:119: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]       while (in.transfered() < in.count()) {
[warn]                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java:120: warning: [deprecation] transfered() in FileRegion has been deprecated
[warn]         in.transferTo(channel, in.transfered());
[warn]                                  ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java:80: warning: [static] static method should be qualified by type name, Murmur3_x86_32, instead of by an expression
[warn]     Assert.assertEquals(-300363099, hasher.hashUnsafeWords(bytes, offset, 16, 42));
[warn]                                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java:84: warning: [static] static method should be qualified by type name, Murmur3_x86_32, instead of by an expression
[warn]     Assert.assertEquals(-1210324667, hasher.hashUnsafeWords(bytes, offset, 16, 42));
[warn]                                            ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java:88: warning: [static] static method should be qualified by type name, Murmur3_x86_32, instead of by an expression
[warn]     Assert.assertEquals(-634919701, hasher.hashUnsafeWords(bytes, offset, 16, 42));
[warn]                                           ^
```

**launcher**:

```
[warn] Pruning sources from previous analysis, due to incompatible CompileSetup.
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java:31: warning: [rawtypes] found raw type: AbstractLauncher
[warn] public abstract class AbstractLauncher<T extends AbstractLauncher> {
[warn]                                                  ^
[warn]   missing type arguments for generic class AbstractLauncher<T>
[warn]   where T is a type-variable:
[warn]     T extends AbstractLauncher declared in class AbstractLauncher
```

**core**:

```
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala:99: method group in class AbstractBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]     if (bootstrap != null && bootstrap.group() != null) {
[warn]                                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala💯 method group in class AbstractBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]       bootstrap.group().shutdownGracefully()
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala:102: method childGroup in class ServerBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]     if (bootstrap != null && bootstrap.childGroup() != null) {
[warn]                                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/main/scala/org/apache/spark/api/r/RBackend.scala:103: method childGroup in class ServerBootstrap is deprecated: see corresponding Javadoc for more information.
[warn]       bootstrap.childGroup().shutdownGracefully()
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala:151: reflective access of structural type member method getData should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn] This can be achieved by adding the import clause 'import scala.language.reflectiveCalls'
[warn] or by setting the compiler option -language:reflectiveCalls.
[warn] See the Scaladoc for value scala.language.reflectiveCalls for a discussion
[warn] why the feature should be explicitly enabled.
[warn]       val rdd = sc.parallelize(1 to 1).map(concreteObject.getData)
[warn]                                                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala:175: reflective access of structural type member value innerObject2 should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn]       val rdd = sc.parallelize(1 to 1).map(concreteObject.innerObject2.getData)
[warn]                                                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala:175: reflective access of structural type member method getData should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn]       val rdd = sc.parallelize(1 to 1).map(concreteObject.innerObject2.getData)
[warn]                                                                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/LocalSparkContext.scala:32: constructor Slf4JLoggerFactory in class Slf4JLoggerFactory is deprecated: see corresponding Javadoc for more information.
[warn]     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
[warn]                                             ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:218: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]         assert(wrapper.stageAttemptId === stages.head.attemptId)
[warn]                                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:261: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       stageAttemptId = stages.head.attemptId))
[warn]                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:287: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       stageAttemptId = stages.head.attemptId))
[warn]                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:471: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       stageAttemptId = stages.last.attemptId))
[warn]                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:966: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]     listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))
[warn]                                                                          ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:972: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]     listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
[warn]                                                                      ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:976: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       .taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))
[warn]                                             ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:1146: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
[warn]                                                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:1150: value attemptId in class StageInfo is deprecated: Use attemptNumber instead
[warn]       SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
[warn]                                                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala:197: method transfered in trait FileRegion is deprecated: see corresponding Javadoc for more information.
[warn]     while (region.transfered() < region.count()) {
[warn]                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala:198: method transfered in trait FileRegion is deprecated: see corresponding Javadoc for more information.
[warn]       region.transferTo(byteChannel, region.transfered())
[warn]                                             ^
```

**sql**:

```
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:534: abstract type T is unchecked since it is eliminated by erasure
[warn]       assert(partitioning.isInstanceOf[T])
[warn]                                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:534: abstract type T is unchecked since it is eliminated by erasure
[warn]       assert(partitioning.isInstanceOf[T])
[warn]             ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala:323: inferred existential type Option[Class[_$1]]( forSome { type _$1 }), which cannot be expressed by wildcards,  should be enabled
[warn] by making the implicit value scala.language.existentials visible.
[warn] This can be achieved by adding the import clause 'import scala.language.existentials'
[warn] or by setting the compiler option -language:existentials.
[warn] See the Scaladoc for value scala.language.existentials for a discussion
[warn] why the feature should be explicitly enabled.
[warn]       val optClass = Option(collectionCls)
[warn]                            ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:226: warning: [deprecation] ParquetFileReader(Configuration,FileMetaData,Path,List<BlockMetaData>,List<ColumnDescriptor>) in ParquetFileReader has been deprecated
[warn]     this.reader = new ParquetFileReader(
[warn]                   ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:178: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
[warn]                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:179: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64  &&
[warn]                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:181: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:182: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:183: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:198: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]         switch (descriptor.getType()) {
[warn]                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:221: warning: [deprecation] getTypeLength() in ColumnDescriptor has been deprecated
[warn]             readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength());
[warn]                                                                      ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:224: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]             throw new IOException("Unsupported type: " + descriptor.getType());
[warn]                                                                    ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:246: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]       descriptor.getType().toString(),
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:258: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]     switch (descriptor.getType()) {
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java:384: warning: [deprecation] getType() in ColumnDescriptor has been deprecated
[warn]         throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
[warn]                                                                                  ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java:458: warning: [static] static variable should be qualified by type name, BaseRepeatedValueVector, instead of by an expression
[warn]       int index = rowId * accessor.OFFSET_WIDTH;
[warn]                                   ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java:460: warning: [static] static variable should be qualified by type name, BaseRepeatedValueVector, instead of by an expression
[warn]       int end = offsets.getInt(index + accessor.OFFSET_WIDTH);
[warn]                                                ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala:57: a pure expression does nothing in statement position; you may be omitting necessary parentheses
[warn]       case s => s
[warn]                 ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala:182: inferred existential type org.apache.parquet.column.statistics.Statistics[?0]( forSome { type ?0 <: Comparable[?0] }), which cannot be expressed by wildcards,  should be enabled
[warn] by making the implicit value scala.language.existentials visible.
[warn] This can be achieved by adding the import clause 'import scala.language.existentials'
[warn] or by setting the compiler option -language:existentials.
[warn] See the Scaladoc for value scala.language.existentials for a discussion
[warn] why the feature should be explicitly enabled.
[warn]                 val columnStats = oneBlockColumnMeta.getStatistics
[warn]                                                      ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:146: implicit conversion method conv should be enabled
[warn] by making the implicit value scala.language.implicitConversions visible.
[warn] This can be achieved by adding the import clause 'import scala.language.implicitConversions'
[warn] or by setting the compiler option -language:implicitConversions.
[warn] See the Scaladoc for value scala.language.implicitConversions for a discussion
[warn] why the feature should be explicitly enabled.
[warn]     implicit def conv(x: (Int, Long)): KV = KV(x._1, x._2)
[warn]                  ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala:48: implicit conversion method unsafeRow should be enabled
[warn] by making the implicit value scala.language.implicitConversions visible.
[warn]   private implicit def unsafeRow(value: Int) = {
[warn]                        ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala:178: method getType in class ColumnDescriptor is deprecated: see corresponding Javadoc for more information.
[warn]                 assert(oneFooter.getFileMetaData.getSchema.getColumns.get(0).getType() ===
[warn]                                                                              ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala:154: method readAllFootersInParallel in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
[warn]     ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq
[warn]                       ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java:679: warning: [cast] redundant cast to Complex
[warn]     Complex typedOther = (Complex)other;
[warn]                          ^
```

**mllib**:

```
[warn] Pruning sources from previous analysis, due to incompatible CompileSetup.
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala:597: match may not be exhaustive.
[warn] It would fail on the following inputs: None, Some((x: Tuple2[?, ?] forSome x not in (?, ?)))
[warn]     val df = dfs.find {
[warn]                       ^
```

This PR does not target fix all of them since some look pretty tricky to fix and there look too many warnings including false positive (like deprecated API but it's used in its test, etc.)

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21975 from HyukjinKwon/remove-build-warnings.
2018-08-04 11:52:49 -05:00
Nihar Sheth 70462f291b [SPARK-24926][CORE] Ensure numCores is used consistently in all netty configurations
## What changes were proposed in this pull request?

Netty could just ignore user-provided configurations. In particular, spark.driver.cores would be ignored when considering the number of cores available to netty (which would usually just default to Runtime.availableProcessors() ). In transport configurations, the number of threads are based directly on how many cores the system believes it has available, and in yarn cluster mode this would generally overshoot the user-preferred value.

## How was this patch tested?

As this is mostly a configuration change, tests were done manually by adding spark-submit confs and verifying the number of threads started by netty was what was expected.

Passes scalastyle checks from dev/run-tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Nihar Sheth <niharrsheth@gmail.com>

Closes #21885 from NiharS/usableCores.
2018-08-04 10:27:34 -05:00
Wenchen Fan 684c719cc0 [SPARK-23915][SQL][FOLLOWUP] Add array_except function
## What changes were proposed in this pull request?

simplify the codegen:
1. only do real codegen if the type can be specialized by the hash set
2. change the null handling. Before: track the nullElementIndex, and create a new ArrayData to insert the null in the middle. After: track the nullElementIndex, put a null placeholder in the ArrayBuilder, at the end create ArrayData from ArrayBuilder directly.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21966 from cloud-fan/minor2.
2018-08-04 16:35:14 +09:00
Takuya UESHIN 0ecc132d6b [SPARK-23909][SQL] Add filter function.
## What changes were proposed in this pull request?

This pr adds `filter` function which filters the input array using the given predicate.

```sql
> SELECT filter(array(1, 2, 3), x -> x % 2 == 1);
 array(1, 3)
```

## How was this patch tested?

Added tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21965 from ueshin/issues/SPARK-23909/filter.
2018-08-04 16:08:53 +09:00
John Zhuge 36ea55e97e [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL Queries
## What changes were proposed in this pull request?

Many Spark SQL users in my company have asked for a way to control the number of output files in Spark SQL. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code. We propose adding the following Hive-style Coalesce and Repartition Hint to Spark SQL:
```
... SELECT /*+ COALESCE(numPartitions) */ ...
... SELECT /*+ REPARTITION(numPartitions) */ ...
```
Multiple such hints are allowed. Multiple nodes are inserted into the logical plan, and the optimizer will pick the leftmost hint.
```
INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t

== Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `s`, false, false
+- 'UnresolvedHint REPARTITION, [100]
   +- 'UnresolvedHint COALESCE, [500]
      +- 'UnresolvedHint COALESCE, [10]
         +- 'Project [*]
            +- 'UnresolvedRelation `t`

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand ...
+- Repartition 100, true
   +- HiveTableRelation ...
```

## How was this patch tested?

All unit tests. Manual tests using explain.

Author: John Zhuge <jzhuge@apache.org>

Closes #21911 from jzhuge/SPARK-24940.
2018-08-04 02:27:15 -04:00
Maxim Gekk 41c2227a23 [SPARK-24722][SQL] pivot() with Column type argument
## What changes were proposed in this pull request?

In the PR, I propose column-based API for the `pivot()` function. It allows using of any column expressions as the pivot column. Also this makes it consistent with how groupBy() works.

## How was this patch tested?

I added new tests to `DataFramePivotSuite` and updated PySpark examples for the `pivot()` function.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21699 from MaxGekk/pivot-column.
2018-08-04 14:17:32 +08:00
Sean Owen 4c27663cb2
[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

Increase ZK timeout and harmonize configs across Kafka tests to resol…ve potentially flaky test failure

## How was this patch tested?

Existing tests

Author: Sean Owen <srowen@gmail.com>

Closes #21995 from srowen/SPARK-18057.3.
2018-08-03 16:22:54 -07:00
Onwuka Gideon 8c14276c33 Little typo
## What changes were proposed in this pull request?
Fixed little typo for a comment

## How was this patch tested?
Manual test

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Onwuka Gideon <dongidomed@gmail.com>

Closes #21992 from dongido001/patch-1.
2018-08-03 17:39:40 -05:00
Xingbo Jiang 92b48842b9 [SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled
## What changes were proposed in this pull request?

We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again).

We perform the check on job submit and fail fast if running a barrier stage with dynamic resource allocation enabled.

## How was this patch tested?

Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast cases that submitted a job containing one or more barrier stages.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21915 from jiangxb1987/SPARK-24954.
2018-08-03 09:36:56 -07:00
Sean Owen c32dbd6bd5 [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request?

Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise.

## How was this patch tested?

Existing tests.

Author: Sean Owen <srowen@gmail.com>

Closes #21955 from srowen/SPARK-18057.2.
2018-08-03 08:17:18 -05:00
DB Tsai 273b28404c
[SPARK-24993][SQL] Make Avro Fast Again
## What changes were proposed in this pull request?

When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset.

With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further.

Spark 2.4
```
spark git:(master) ./build/mvn -DskipTests clean package
spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar
```

Spark 2.3 + databricks avro
```
spark git:(branch-2.3) ./build/mvn -DskipTests clean package
spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
```

Current master:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|             2.95621|
| stddev|0.030895815479469294|
|    min|               2.915|
|    max|               3.049|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.31072999999999995|
| stddev|0.054139709842390006|
|    min|               0.259|
|    max|               0.692|
+-------+--------------------+
```

Current master with this PR:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  2.5804300000000002|
| stddev|0.011175600225672079|
|    min|               2.558|
|    max|                2.62|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.29922000000000004|
| stddev|0.058261961532514166|
|    min|               0.251|
|    max|               0.732|
+-------+--------------------+
```

Spark 2.3 + databricks avro:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  1.7730500000000005|
| stddev|0.025199156230863575|
|    min|               1.729|
|    max|               1.833|
+-------+--------------------+

+-------+-------------------+
|summary|          readTimes|
+-------+-------------------+
|  count|                100|
|   mean|            0.29715|
| stddev|0.05685643358850465|
|    min|              0.258|
|    max|              0.718|
+-------+-------------------+
```

The following is the test code to reproduce the result.
```scala
    spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
    val sparkSession = spark
    import sparkSession.implicits._
    val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid =>
      val features = Array.fill(16000)(scala.math.random)
      (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features)
    }.toDF("uid", "random", "uuid1", "uuid2", "features").cache()
    val size = df.count()

    // Write into ramdisk to rule out the disk IO impact
    val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/"
    val n = 150
    val writeTimes = new Array[Double](n)
    var i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      df.write
        .format("com.databricks.spark.avro")
        .mode("overwrite")
        .save(tempSaveDir)
      val t2 = System.currentTimeMillis()
      writeTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    df.unpersist()

    // The first 50 runs are for warm-up
    val readTimes = new Array[Double](n)
    i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir)
      assert(readDF.count() == size)
      val t2 = System.currentTimeMillis()
      readTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
    spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
```

## How was this patch tested?

Existing tests.

Author: DB Tsai <d_tsai@apple.com>
Author: Brian Lindblom <blindblom@apple.com>

Closes #21952 from dbtsai/avro-performance-fix.
2018-08-03 07:43:54 +00:00
Devaraj K 53ca9755db
[SPARK-25009][CORE] Standalone Cluster mode application submit is not working
## What changes were proposed in this pull request?

It seems 'doRunMain()' has been removed accidentally by other PR and due to that the application submission is not happening, this PR adds back the 'doRunMain()' for standalone cluster submission.

## How was this patch tested?

I verified it manually by submitting application in standalone cluster mode, all the applications are submitting to the Master with the change.

Author: Devaraj K <devaraj@apache.org>

Closes #21979 from devaraj-kavali/SPARK-25009.
2018-08-03 07:23:56 +00:00
Yuhao Yang ebf33a333e [SAPRK-25011][ML] add prefix to __all__ in fpm.py
## What changes were proposed in this pull request?

jira: https://issues.apache.org/jira/browse/SPARK-25011

add prefix to __all__ in fpm.py

## How was this patch tested?

existing unit test.

Author: Yuhao Yang <yuhao.yang@intel.com>

Closes #21981 from hhbyyh/prefixall.
2018-08-03 15:02:41 +08:00
Dilip Biswal 19a4531913 [SPARK-24997][SQL] Enable support of MINUS ALL
## What changes were proposed in this pull request?
Enable support for MINUS ALL which was gated at AstBuilder.

## How was this patch tested?
Added tests in SQLQueryTestSuite and modify PlanParserSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21963 from dilipbiswal/minus-all.
2018-08-02 22:45:10 -07:00
Chris Horn b0d6967d45 [SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs should not fail
## What changes were proposed in this pull request?
In the current master, `toString` throws an exception when `RelationalGroupedDataset` has unresolved expressions;
```
scala> spark.range(0, 10).groupBy("id")
res4: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [id: bigint], value: [id: bigint], type: GroupBy]

scala> spark.range(0, 10).groupBy('id)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'id
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
  at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:474)
  at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:473)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.spark.sql.RelationalGroupedDataset.toString(RelationalGroupedDataset.scala:473)
  at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
```
This pr fixed code to handle the unresolved case in `RelationalGroupedDataset.toString`.

Closes #21752

## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.

Author: Chris Horn <chorn4033@gmail.com>
Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21964 from maropu/SPARK-24788.
2018-08-02 22:40:58 -07:00
Gengliang Wang f45d60a5a1 [SPARK-25002][SQL] Avro: revise the output record namespace
## What changes were proposed in this pull request?

Currently the output namespace is starting with ".", e.g. `.topLevelRecord`

Although it is valid according to Avro spec, we should remove the starting dot in case of failures when the output Avro file is read by other lib:

https://github.com/linkedin/goavro/pull/96

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21974 from gengliangwang/avro_namespace.
2018-08-03 13:28:44 +08:00
Dilip Biswal 73dd6cf9b5 [SPARK-24966][SQL] Implement precedence rules for set operations.
## What changes were proposed in this pull request?

Currently the set operations INTERSECT, UNION and EXCEPT are assigned the same precedence. This PR fixes the problem by giving INTERSECT  higher precedence than UNION and EXCEPT. UNION and EXCEPT operators are evaluated in the order in which they appear in the query from left to right.

This results in change in behavior because of the change in order of evaluations of set operators in a query. The old behavior is still preserved under a newly added config parameter.

Query `:`
```
SELECT * FROM t1
UNION
SELECT * FROM t2
EXCEPT
SELECT * FROM t3
INTERSECT
SELECT * FROM t4
```
Parsed plan before the change `:`
```
== Parsed Logical Plan ==
'Intersect false
:- 'Except false
:  :- 'Distinct
:  :  +- 'Union
:  :     :- 'Project [*]
:  :     :  +- 'UnresolvedRelation `t1`
:  :     +- 'Project [*]
:  :        +- 'UnresolvedRelation `t2`
:  +- 'Project [*]
:     +- 'UnresolvedRelation `t3`
+- 'Project [*]
   +- 'UnresolvedRelation `t4`
```
Parsed plan after the change `:`
```
== Parsed Logical Plan ==
'Except false
:- 'Distinct
:  +- 'Union
:     :- 'Project [*]
:     :  +- 'UnresolvedRelation `t1`
:     +- 'Project [*]
:        +- 'UnresolvedRelation `t2`
+- 'Intersect false
   :- 'Project [*]
   :  +- 'UnresolvedRelation `t3`
   +- 'Project [*]
      +- 'UnresolvedRelation `t4`
```
## How was this patch tested?
Added tests in PlanParserSuite, SQLQueryTestSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21941 from dilipbiswal/SPARK-24966.
2018-08-02 22:04:17 -07:00
Maxim Gekk b3f2911eeb [SPARK-24945][SQL] Switching to uniVocity 2.7.3
## What changes were proposed in this pull request?

In the PR, I propose to upgrade uniVocity parser from **2.6.3** to **2.7.3**. The recent version includes a fix for the SPARK-24645 issue and has better performance.

Before changes:
```
Parsing quoted values:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
One quoted string                           33336 / 34122          0.0      666727.0       1.0X

Wide rows with 1000 columns:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 1000 columns                         90287 / 91713          0.0       90286.9       1.0X
Select 100 columns                          31826 / 36589          0.0       31826.4       2.8X
Select one column                           25738 / 25872          0.0       25737.9       3.5X
count()                                       6931 / 7269          0.1        6931.5      13.0X
```
after:
```
Parsing quoted values:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
One quoted string                           33411 / 33510          0.0      668211.4       1.0X

Wide rows with 1000 columns:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 1000 columns                         88028 / 89311          0.0       88028.1       1.0X
Select 100 columns                          29010 / 32755          0.0       29010.1       3.0X
Select one column                           22936 / 22953          0.0       22936.5       3.8X
count()                                       6657 / 6740          0.2        6656.6      13.5X
```
Closes #21892

## How was this patch tested?

It was tested by `CSVSuite` and `CSVBenchmarks`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21969 from MaxGekk/univocity-2_7_3.
2018-08-03 08:33:28 +08:00
Gengliang Wang 7cf16a7fa4 [SPARK-24773] Avro: support logical timestamp type with different precisions
## What changes were proposed in this pull request?

Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29

To specify the output timestamp type, use Dataframe option `outputTimestampType`  or SQL config `spark.sql.avro.outputTimestampType`.  The supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`

The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21935 from gengliangwang/avro_timestamp.
2018-08-03 08:32:08 +08:00
Xingbo Jiang 29077a1d15 [SPARK-24795][CORE][FOLLOWUP] Combine BarrierTaskContext with BarrierTaskContextImpl
## What changes were proposed in this pull request?

According to https://github.com/apache/spark/pull/21758#discussion_r206746905 , current declaration of `BarrierTaskContext` didn't extend methods from `TaskContext`. Since `TaskContext` is an abstract class and we don't want to change it to a trait, we have to define class `BarrierTaskContext` directly.

## How was this patch tested?

Existing tests.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21972 from jiangxb1987/BarrierTaskContext.
2018-08-02 17:19:42 -07:00
Kazuaki Ishizaki bbdcc3bf61 [SPARK-22219][SQL] Refactor code to get a value for "spark.sql.codegen.comments"
## What changes were proposed in this pull request?

This PR refactors code to get a value for "spark.sql.codegen.comments" by avoiding `SparkEnv.get.conf`. This PR uses `SQLConf.get.codegenComments` since `SQLConf.get` always returns an instance of `SQLConf`.

## How was this patch tested?

Added test case to `DebuggingSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19449 from kiszk/SPARK-22219.
2018-08-02 18:19:04 -05:00
Liang-Chi Hsieh d0bc3ed679
[SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query
## What changes were proposed in this pull request?

`Uuid`'s results depend on random seed given during analysis. Thus under streaming query, we will have the same uuids in each execution. This seems to be incorrect for streaming query execution.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21854 from viirya/uuid_in_streaming.
2018-08-02 15:35:46 -07:00
Takeshi Yamamuro efef55388f [SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused
## What changes were proposed in this pull request?
In the current master, `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes some duplicate exchange and the actual number of registered exchanges changes. Finally, the assertion in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges become different;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201

This pr fixed the issue and the code to reproduce this is as follows;
```
scala> sql("SET spark.sql.adaptive.enabled=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
scala> val df = spark.range(1).selectExpr("id AS key", "id AS value")
scala> val resultDf = df.join(df, "key").join(df, "key")
scala> resultDf.show
...
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 101 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
...
```

## How was this patch tested?
Added tests in `ExchangeCoordinatorSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21754 from maropu/SPARK-24705-2.
2018-08-02 13:05:36 -07:00
Takuya UESHIN 02f967795b [SPARK-23908][SQL] Add transform function.
## What changes were proposed in this pull request?

This pr adds `transform` function which transforms elements in an array using the function.
Optionally we can take the index of each element as the second argument.

```sql
> SELECT transform(array(1, 2, 3), x -> x + 1);
 array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
 array(1, 3, 5)
```

## How was this patch tested?

Added tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21954 from ueshin/issues/SPARK-23908/transform.
2018-08-02 13:00:33 -07:00
Takuya UESHIN 0df6bf8829 [BUILD] Fix lint-python.
## What changes were proposed in this pull request?

This pr fixes lint-python.

```
./python/pyspark/accumulators.py:231:9: E306 expected 1 blank line before a nested definition, found 0
./python/pyspark/accumulators.py:257:101: E501 line too long (107 > 100 characters)
./python/pyspark/accumulators.py:264:1: E302 expected 2 blank lines, found 1
./python/pyspark/accumulators.py:281:1: E302 expected 2 blank lines, found 1
```

## How was this patch tested?

Executed lint-python manually.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21973 from ueshin/issues/build/1/fix_lint-python.
2018-08-03 03:18:46 +09:00
Xingbo Jiang 38e4699c97 [SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern
## What changes were proposed in this pull request?

Check on job submit to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The following patterns are not supported:
- Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD);
- An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)).

## How was this patch tested?

Add test cases in `BarrierStageOnSubmittedSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21927 from jiangxb1987/SPARK-24820.
2018-08-02 09:36:26 -07:00
Marco Gaido ad2e636628 [SPARK-24598][DOCS] State in the documentation the behavior when arithmetic operations cause overflow
## What changes were proposed in this pull request?

According to the discussion in https://github.com/apache/spark/pull/21599, changing the behavior of arithmetic operations so that they can check for overflow is not nice in a minor release. What we can do for 2.4 is warn users about the current behavior in the documentation, so that they are aware of the issue and can take proper actions.

## How was this patch tested?

NA

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21967 from mgaido91/SPARK-24598_doc.
2018-08-02 09:28:13 -07:00
LucaCanali 15fc237226 Updates to Accumulators 2018-08-02 10:03:22 -05:00
Wenchen Fan f04cd67094 [MINOR] remove dead code in ExpressionEvalHelper
## What changes were proposed in this pull request?

This addresses https://github.com/apache/spark/pull/21236/files#r207078480

both https://github.com/apache/spark/pull/21236 and https://github.com/apache/spark/pull/21838 add a InternalRow result check to ExpressionEvalHelper and becomes duplicated.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21958 from cloud-fan/minor.
2018-08-02 09:26:27 -05:00
Kaya Kupferschmidt d182b3d34d [SPARK-24742] Fix NullPointerexception in Field Metadata
## What changes were proposed in this pull request?

This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull"

## How was this patch tested?

A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala

Author: Kaya Kupferschmidt <k.kupferschmidt@dimajix.de>

Closes #21722 from kupferk/SPARK-24742.
2018-08-02 22:23:24 +08:00
Kaya Kupferschmidt 7be6fc3c77 [SPARK-24742] Fix NullPointerexception in Field Metadata
## What changes were proposed in this pull request?

This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull"

## How was this patch tested?

A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala

Author: Kaya Kupferschmidt <k.kupferschmidt@dimajix.de>

Closes #21722 from kupferk/SPARK-24742.
2018-08-02 09:22:21 -05:00
Xiao Li 46110a589f [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalPlan Node
## What changes were proposed in this pull request?
Remove the AnalysisBarrier LogicalPlan node, which is useless now.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21962 from gatorsmile/refactor2.
2018-08-02 22:20:41 +08:00
Stavros Kontopoulos a65736996b [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support
## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21930 from skonto/scala2.12-sup.
2018-08-02 09:17:09 -05:00
Xingbo Jiang 275415777b [SPARK-24795][CORE][FOLLOWUP] Kill all running tasks when a task in a barrier stage fail
## What changes were proposed in this pull request?

Kill all running tasks when a task in a barrier stage fail in the middle. `TaskScheduler`.`cancelTasks()` will also fail the job, so we implemented a new method `killAllTaskAttempts()` to just kill all running tasks of a stage without cancel the stage/job.

## How was this patch tested?

Add new test cases in `TaskSchedulerImplSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21943 from jiangxb1987/killAllTasks.
2018-08-02 20:54:36 +08:00
zhengruifeng 57d994994d [SPARK-24557][ML] ClusteringEvaluator support array input
## What changes were proposed in this pull request?
ClusteringEvaluator support array input

## How was this patch tested?
added tests

Author: zhengruifeng <ruifengz@foxmail.com>

Closes #21563 from zhengruifeng/clu_eval_support_array.
2018-08-01 23:46:01 -07:00
Xiao Li 166f346185 [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
## What changes were proposed in this pull request?
This PR is to refactor the code in AVERAGE by dsl.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21951 from gatorsmile/refactor1.
2018-08-01 23:00:17 -07:00
Sean Owen c9914cf049 [MINOR][DOCS] Add note about Spark network security
## What changes were proposed in this pull request?

In response to a recent question, this reiterates that network access to a Spark cluster should be disabled by default, and that access to its hosts and services from outside a private network should be added back explicitly.

Also, some minor touch-ups while I was at it.

## How was this patch tested?

N/A

Author: Sean Owen <srowen@gmail.com>

Closes #21947 from srowen/SecurityNote.
2018-08-02 10:22:52 +08:00
liuxian c5fe412928 [SPARK-18188][DOC][FOLLOW-UP] Add spark.broadcast.checksum to configuration
## What changes were proposed in this pull request?

This pr add `spark.broadcast.checksum` to configuration.

## How was this patch tested?
manually tested

Author: liuxian <liu.xian3@zte.com.cn>

Closes #21825 from 10110346/checksum_config.
2018-08-01 21:19:24 -05:00
Wenchen Fan ce084d3e06 [SPARK-24990][SQL] merge ReadSupport and ReadSupportWithSchema
## What changes were proposed in this pull request?

Regarding user-specified schema, data sources may have 3 different behaviors:
1. must have a user-specified schema
2. can't have a user-specified schema
3. can accept the user-specified if it's given, or infer the schema.

I added `ReadSupportWithSchema` to support these behaviors, following data source v1. But it turns out we don't need this extra interface. We can just add a `createReader(schema, options)` to `ReadSupport` and make it call `createReader(options)` by default.

TODO: also fix the streaming API in followup PRs.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21946 from cloud-fan/ds-schema.
2018-08-01 15:57:54 -07:00
Yuming Wang 9f558601e8 [SPARK-24937][SQL] Datasource partition table should load empty static partitions
## What changes were proposed in this pull request?

How to reproduce:
```sql
spark-sql> CREATE TABLE tbl AS SELECT 1;
spark-sql> CREATE TABLE tbl1 (c1 BIGINT, day STRING, hour STRING)
         > USING parquet
         > PARTITIONED BY (day, hour);
spark-sql> INSERT INTO TABLE tbl1 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0;
spark-sql> SHOW PARTITIONS tbl1;
spark-sql> CREATE TABLE tbl2 (c1 BIGINT)
         > PARTITIONED BY (day STRING, hour STRING);
spark-sql> INSERT INTO TABLE tbl2 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0;
spark-sql> SHOW PARTITIONS tbl2;
day=2018-07-25/hour=01
spark-sql>
```
1. Users will be confused about whether the partition data of `tbl1` is generated.
2. Inconsistent with Hive table behavior.

This pr fix this issues.

## How was this patch tested?

unit tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21883 from wangyum/SPARK-24937.
2018-08-01 13:58:29 -07:00
Adelbert Chang f5113ea8d7 [SPARK-24960][K8S] explicitly expose ports on driver container
https://issues.apache.org/jira/browse/SPARK-24960
## What changes were proposed in this pull request?

Expose ports explicitly in the driver container. The driver Service created expects to reach the driver Pod at specific ports which before this change, were not explicitly exposed and would likely cause connection issues (see https://github.com/apache-spark-on-k8s/spark/issues/617).

This is a port of the original PR created in the now-deprecated Kubernetes fork: https://github.com/apache-spark-on-k8s/spark/pull/618

## How was this patch tested?

Failure in https://github.com/apache-spark-on-k8s/spark/issues/617 reproduced on Kubernetes 1.6.x and 1.8.x. Built the driver image with this patch and observed fixed https://github.com/apache-spark-on-k8s/spark/issues/617 on Kubernetes 1.6.x.

Author: Adelbert Chang <Adelbert.Chang@target.com>

Closes #21884 from adelbertc/k8s-expose-driver-ports.
2018-08-01 13:57:33 -07:00
Kazuaki Ishizaki 95a9d5e3a5 [SPARK-23915][SQL] Add array_except function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_except`. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in array1 but not in array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21103 from kiszk/SPARK-23915.
2018-08-02 02:52:30 +08:00
Wenchen Fan defc54c69a [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
## What changes were proposed in this pull request?

This is a follow up of https://github.com/apache/spark/pull/21118 .

In https://github.com/apache/spark/pull/21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21921 from cloud-fan/row.
2018-08-01 21:39:35 +08:00
Marcelo Vanzin 1122754bd9 [SPARK-24653][TESTS] Avoid cross-job pollution in TestUtils / SpillListener.
There is a narrow race in this code that is caused when the code being
run in assertSpilled / assertNotSpilled runs more than a single job.

SpillListener assumed that only a single job was run, and so would only
block waiting for that single job to finish when `numSpilledStages` was
called. But some tests (like SQL tests that call `checkAnswer`) run more
than one job, and so that wait was basically a no-op.

This could cause the next test to install a listener to receive events
from the previous job. Which could cause test failures in certain cases.

The change fixes that race, and also uninstalls listeners after the
test runs, so they don't accumulate when the SparkContext is shared
among multiple tests.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21639 from vanzin/SPARK-24653.
2018-08-01 15:47:46 +08:00
Reynold Xin 1efffb7993 [SPARK-24982][SQL] UDAF resolution should not throw AssertionError
## What changes were proposed in this pull request?
When user calls anUDAF with the wrong number of arguments, Spark previously throws an AssertionError, which is not supposed to be a user-facing exception.  This patch updates it to throw AnalysisException instead, so it is consistent with a regular UDF.

## How was this patch tested?
Updated test case udaf.sql.

Author: Reynold Xin <rxin@databricks.com>

Closes #21938 from rxin/SPARK-24982.
2018-08-01 00:15:31 -07:00
Reynold Xin 1f7e22c72c [SPARK-24951][SQL] Table valued functions should throw AnalysisException
## What changes were proposed in this pull request?
Previously TVF resolution could throw IllegalArgumentException if the data type is null type. This patch replaces that exception with AnalysisException, enriched with positional information, to improve error message reporting and to be more consistent with rest of Spark SQL.

## How was this patch tested?
Updated the test case in table-valued-functions.sql.out, which is how I identified this problem in the first place.

Author: Reynold Xin <rxin@databricks.com>

Closes #21934 from rxin/SPARK-24951.
2018-07-31 22:25:40 -07:00
DB Tsai 5f3441e542 [SPARK-24893][SQL] Remove the entire CaseWhen if all the outputs are semantic equivalence
## What changes were proposed in this pull request?

Similar to SPARK-24890, if all the outputs of `CaseWhen` are semantic equivalence, `CaseWhen` can be removed.

## How was this patch tested?

Tests added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21852 from dbtsai/short-circuit-when.
2018-08-01 10:31:02 +08:00
hyukjinkwon f4772fd26f [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0)
## What changes were proposed in this pull request?

See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using `from_pandas` to convert decimals fails if encounters a value of `None`:

```python
import pyarrow as pa
import pandas as pd
from decimal import Decimal

pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 2))
```

**Arrow 0.8.0**

```
<pyarrow.lib.Decimal128Array object at 0x10a572c58>
[
  Decimal('3.14'),
  NA
]
```

**Arrow 0.9.0**

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal
```

This PR propose to work around this via Decimal NaN:

```python
pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), type=pa.decimal128(3, 2))
```

```
<pyarrow.lib.Decimal128Array object at 0x10ffd2e68>
[
  Decimal('3.14'),
  NA
]
```

## How was this patch tested?

Manually tested:

```bash
SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests
```

**Before**

```
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 4672, in test_vectorized_udf_null_decimal
    self.assertEquals(df.collect(), res.collect())
  File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect
    sock_info = self._jdf.collectToPython()
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o51.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/pyspark/worker.py", line 320, in main
    process()
  File "/.../spark/python/pyspark/worker.py", line 315, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/.../spark/python/pyspark/serializers.py", line 241, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t)
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal
```

**After**

```
Running tests...
----------------------------------------------------------------------
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
.......S.............................
----------------------------------------------------------------------
Ran 37 tests in 21.980s
```

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21928 from HyukjinKwon/SPARK-24976.
2018-07-31 17:24:24 -07:00
Huaxin Gao 42dfe4f159 [SPARK-24973][PYTHON] Add numIter to Python ClusteringSummary
## What changes were proposed in this pull request?

Add numIter to Python version of ClusteringSummary

## How was this patch tested?

Modified existing UT test_multiclass_logistic_regression_summary

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #21925 from huaxingao/spark-24973.
2018-07-31 15:23:11 -05:00