Commit graph

574 commits

Author SHA1 Message Date
Gabor Somogyi a006c85077 [SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
## What changes were proposed in this pull request?

According to the documentation `groupIdPrefix` should be available for `streaming and batch`.
It is not the case because the batch part is missing.

In this PR I've added:
* Structured Streaming test for v1 and v2 to cover `groupIdPrefix`
* Batch test for v1 and v2 to cover `groupIdPrefix`
* Added `groupIdPrefix` usage in batch

## How was this patch tested?

Additional + existing unit tests.

Closes #25030 from gaborgsomogyi/SPARK-28232.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-02 20:37:52 +08:00
Gabor Somogyi 0a4f985ca0 [SPARK-23098][SQL] Migrate Kafka Batch source to v2.
## What changes were proposed in this pull request?

Kafka batch data source is using v1 at the moment. In the PR I've migrated to v2. Majority of the change is moving code.

What this PR contains:
* useV1Sources usage fixed in `DataFrameReader` and `DataFrameWriter`
* `KafkaBatch` added to handle DSv2 batch reading
* `KafkaBatchWrite` added to handle DSv2 batch writing
* `KafkaBatchPartitionReader` extracted to share between batch and microbatch
* `KafkaDataWriter` extracted to share between batch, microbatch and continuous
* Batch related source/sink tests are now executing on v1 and v2 connectors
* Couple of classes hidden now, functions moved + couple of minor fixes

## How was this patch tested?

Existing + added unit tests.

Closes #24738 from gaborgsomogyi/SPARK-23098.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-02 09:47:30 +08:00
Dongjoon Hyun a7e16199f3 [SPARK-28174][BUILD][SS] Upgrade to Kafka 2.3.0
## What changes were proposed in this pull request?

This issue updates Kafka dependency to 2.3.0 to bring the following 9 client-side patches at least. Among them, the blocker issue [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703) was reported by Apache Spark community. This dependency update will help us remove the workaround later.
- https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%202.3.0%20AND%20fixVersion%20NOT%20IN%20(2.2.0%2C%202.2.1)%20AND%20component%20%3D%20clients

The following is the full release note.
- https://www.apache.org/dist/kafka/2.3.0/RELEASE_NOTES.html

## How was this patch tested?

Pass the Jenkins.

Closes #24976 from dongjoon-hyun/SPARK-28174.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-27 07:49:24 -07:00
Jungtaek Lim (HeartSaVioR) 85e95b7d27 [SPARK-28142][SS] Use CaseInsensitiveStringMap for KafkaContinuousStream
## What changes were proposed in this pull request?

This patch addresses a missing spot which Map should be passed as CaseInsensitiveStringMap - KafkaContinuousStream seems to be only the missed one.

Before this fix, it has a relevant bug where `pollTimeoutMs` is always set to default value, as the value of `KafkaSourceProvider.CONSUMER_POLL_TIMEOUT` is `kafkaConsumer.pollTimeoutMs` which key-lowercased map has been provided as `sourceOptions`.

## How was this patch tested?

N/A.

Closes #24942 from HeartSaVioR/MINOR-use-case-insensitive-map-for-kafka-continuous-source.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-24 22:20:32 +09:00
Liang-Chi Hsieh 527d936049 [SPARK-27798][SQL] from_avro shouldn't produces same value when converted to local relation
## What changes were proposed in this pull request?

When using `from_avro` to deserialize avro data to catalyst StructType format, if `ConvertToLocalRelation` is applied at the time, `from_avro` produces only the last value (overriding previous values).

The cause is `AvroDeserializer` reuses output row for StructType. Normally, it should be fine in Spark SQL. But `ConvertToLocalRelation` just uses `InterpretedProjection` to project local rows. `InterpretedProjection` creates new row for each output thro, it includes the same nested row object from `AvroDeserializer`. By the end, converted local relation has only last value.

I think there're two possible options:

1. Make `AvroDeserializer` output new row for StructType.
2. Use `InterpretedMutableProjection` in `ConvertToLocalRelation` and call `copy()` on output rows.

Option 2 is chose because previously `ConvertToLocalRelation` also creates new rows, this `InterpretedMutableProjection` + `copy()` shoudn't bring too much performance penalty. `ConvertToLocalRelation` should be arguably less critical, compared with `AvroDeserializer`.

## How was this patch tested?

Added test.

Closes #24805 from viirya/SPARK-27798.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-07 13:47:36 -07:00
Gabor Somogyi 911fadf33a [SPARK-27748][SS] Kafka consumer/producer password/token redaction.
## What changes were proposed in this pull request?

Kafka parameters are logged at several places and the following parameters has to be redacted:
* Delegation token
* `ssl.truststore.password`
* `ssl.keystore.password`
* `ssl.key.password`

This PR contains:
* Spark central redaction framework used to redact passwords (`spark.redaction.regex`)
* Custom redaction added to handle `sasl.jaas.config` (delegation token)
* Redaction code added into consumer/producer code
* Test refactor

## How was this patch tested?

Existing + additional unit tests.

Closes #24627 from gaborgsomogyi/SPARK-27748.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-06-03 15:43:08 -07:00
Gabbi Merz 29e154b2f1 [SPARK-27858][SQL] Fix for avro deserialization on union types with multiple non-null types
## What changes were proposed in this pull request?

This PR aims to fix an issue on a union avro type with more than one non-null value (for instance `["string", "null", "int"]`) whose the deserialization to a DataFrame would throw a `java.lang.ArrayIndexOutOfBoundsException`. The issue was that the `fieldWriter` relied on the index from the avro schema before nulls were filtered out.

## How was this patch tested?

A test for the case of multiple non-null values was added and the tests were run using sbt by running `testOnly org.apache.spark.sql.avro.AvroSuite`

Closes #24722 from gcmerz/master.

Authored-by: Gabbi Merz <gmerz@palantir.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-27 20:09:23 -07:00
DB Tsai a12de29c1a [SPARK-27838][SQL] Support user provided non-nullable avro schema for nullable catalyst schema without any null record
## What changes were proposed in this pull request?

When the data is read from the sources, the catalyst schema is always nullable. Since Avro uses Union type to represent nullable, when any non-nullable avro file is read and then written out, the schema will always be changed.

This PR provides a solution for users to keep the Avro schema without being forced to use Union type.

## How was this patch tested?

One test is added.

Closes #24682 from dbtsai/avroNull.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-05-24 21:47:14 +00:00
Sean Owen 6c5827c723 [SPARK-27794][R][DOCS] Use https URL for CRAN repo
## What changes were proposed in this pull request?

Use https URL for CRAN repo (and for a Scala download in a Dockerfile)

## How was this patch tested?

Existing tests.

Closes #24664 from srowen/SPARK-27794.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-22 14:28:21 -07:00
Sean Owen 4d64ed8114 [SPARK-27796][MESOS] Remove obsolete spark-mesos Dockerfile example
## What changes were proposed in this pull request?

Remove obsolete spark-mesos Dockerfile example. This isn't tested and apparently hasn't been updated in 4 years.

## How was this patch tested?

N/A

Closes #24667 from srowen/SPARK-27796.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-21 10:53:55 -07:00
DB Tsai 808d9d05fc [SPARK-27762][SQL] Support user provided avro schema for writing fields with different ordering
## What changes were proposed in this pull request?

Spark Avro reader supports reading avro files with provided schema with different field orderings. However, the avro writer doesn't support this feature. This PR enables the Spark avro writer with this feature.

## How was this patch tested?

New test is added.

Closes #24635 from dbtsai/avroFix.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Brian Lindblom <blindblom@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-05-21 17:34:19 +00:00
Gabor Somogyi efa303581a [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
## What changes were proposed in this pull request?

Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`.

Since Kafka consumer caching is not documented I've added this also.

## How was this patch tested?

Existing + added unit test.

```
cd docs
SKIP_API=1 jekyll build
```
and manual webpage check.

Closes #24590 from gaborgsomogyi/SPARK-27687.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-15 10:42:09 -07:00
Sean Owen a10608cb82 [SPARK-27680][CORE][SQL][GRAPHX] Remove usage of Traversable
## What changes were proposed in this pull request?

This removes usage of `Traversable`, which is removed in Scala 2.13. This is mostly an internal change, except for the change in the `SparkConf.setAll` method. See additional comments below.

## How was this patch tested?

Existing tests.

Closes #24584 from srowen/SPARK-27680.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-14 09:14:56 -05:00
hehuiyuan 5a8aad01c2 [SPARK-27343][KAFKA][SS] Avoid hardcoded for spark-sql-kafka-0-10
## What changes were proposed in this pull request?

[SPARK-27343](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27343)

Based on the previous PR: https://github.com/apache/spark/pull/24270

Extracting parameters , building the objects of ConfigEntry.

For example:
for the parameter "spark.kafka.producer.cache.timeout",we build
```
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
    ConfigBuilder("spark.kafka.producer.cache.timeout")
      .doc("The expire time to remove the unused producers.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10m")
```

Closes #24574 from hehuiyuan/hehuiyuan-patch-9.

Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-12 10:46:18 -05:00
Gengliang Wang 78a403fab9 [SPARK-27627][SQL] Make option "pathGlobFilter" as a general option for all file sources
## What changes were proposed in this pull request?

### Background:
The data source option `pathGlobFilter` is introduced for Binary file format: https://github.com/apache/spark/pull/24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory.

### Proposal:
Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver.

### Motivation:
Filtering the file path names in file scan tasks on executors is kind of ugly.

### Impact:
1. The splitting of file partitions will be more balanced.
2. The metrics of file scan will be more accurate.
3. Users can use the option for reading other file sources.

## How was this patch tested?

Unit tests

Closes #24518 from gengliangwang/globFilter.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-09 08:41:43 +09:00
Wenchen Fan bae5baae52 [SPARK-27642][SS] make v1 offset extends v2 offset
## What changes were proposed in this pull request?

To move DS v2 to the catalyst module, we can't make v2 offset rely on v1 offset, as v1 offset is in sql/core.

## How was this patch tested?

existing tests

Closes #24538 from cloud-fan/offset.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-05-07 23:03:15 -07:00
gengjiaan 8329e7debd [SPARK-27649][SS] Unify the way use 'spark.network.timeout'
## What changes were proposed in this pull request?

For historical reasons, structured streaming still has some old way of use
`spark.network.timeout`
, even though
`org.apache.spark.internal.config.Network.NETWORK_TIMEOUT`
is now available.

## How was this patch tested?
Exists UT.

Closes #24545 from beliefer/unify-spark-network-timeout.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-08 11:43:03 +08:00
Gabor Somogyi 2f55809425 [SPARK-27294][SS] Add multi-cluster Kafka delegation token
## What changes were proposed in this pull request?

The actual implementation doesn't support multi-cluster Kafka connection with delegation token. In this PR I've added this functionality.

What this PR contains:
* New way of configuration
* Multiple delegation token obtain/store/use functionality
* Documentation
* The change works on DStreams also

## How was this patch tested?

Existing + additional unit tests.
Additionally tested on cluster.

Test scenario:

* 2 * 4 node clusters
* The 4-4 nodes are in different kerberos realms
* Cross-Realm trust between the 2 realms
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512
* Artificial exceptions during processing
* Source reads from realm1 sink writes to realm2

Kafka broker settings:

* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

Closes #24305 from gaborgsomogyi/SPARK-27294.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-07 11:40:43 -07:00
Adi Muraru 8ef4da753d [SPARK-27610][YARN] Shade netty native libraries
## What changes were proposed in this pull request?

Fixed the `spark-<version>-yarn-shuffle.jar` artifact packaging to shade the native netty libraries:
- shade the `META-INF/native/libnetty_*` native libraries when packagin
the yarn shuffle service jar. This is required as netty library loader
derives that based on shaded package name.
- updated the `org/spark_project` shade package prefix to `org/sparkproject`
(i.e. removed underscore) as the former breaks the netty native lib loading.

This was causing the yarn external shuffle service to fail
when spark.shuffle.io.mode=EPOLL

## How was this patch tested?
Manual tests

Closes #24502 from amuraru/SPARK-27610_master.

Authored-by: Adi Muraru <amuraru@adobe.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-07 10:47:36 -07:00
Wenchen Fan 6ef45301a4 [SPARK-27579][SQL] remove BaseStreamingSource and BaseStreamingSink
## What changes were proposed in this pull request?

`BaseStreamingSource` and `BaseStreamingSink` is used to unify v1 and v2 streaming data source API in some code paths.

This PR removes these 2 interfaces, and let the v1 API extend v2 API to keep API compatibility.

The motivation is https://github.com/apache/spark/pull/24416 . We want to move data source v2 to catalyst module, but `BaseStreamingSource` and `BaseStreamingSink` are in sql/core.

## How was this patch tested?

existing tests

Closes #24471 from cloud-fan/streaming.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-06 20:41:57 +08:00
Dilip Biswal 6001d476ce [SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
## What changes were proposed in this pull request?
**Description from JIRA**
For the JDBC option `query`, we use the identifier name to start with underscore: s"(${subquery}) _SPARK_GEN_JDBC_SUBQUERY_NAME${curId.getAndIncrement()}". This is not supported by Oracle.
The Oracle doesn't seem to support identifier name to start with non-alphabet character (unless it is quoted) and has length restrictions as well. [link](https://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements008.htm)

In this PR, the generated alias name 'SPARK_GEN_JDBC_SUBQUERY_NAME<int value>' is fixed to remove "_" prefix and also the alias name is shortened to not exceed the identifier length limit.

## How was this patch tested?
Tests are added for MySql, Postgress, Oracle and DB2 to ensure enough coverage.

Closes #24532 from dilipbiswal/SPARK-27596.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-05-05 21:52:23 -07:00
Gabor Somogyi fb6b19ab7c [SPARK-23014][SS] Fully remove V1 memory sink.
## What changes were proposed in this pull request?

There is a MemorySink v2 already so v1 can be removed. In this PR I've removed it completely.
What this PR contains:
* V1 memory sink removal
* V2 memory sink renamed to become the only implementation
* Since DSv2 sends exceptions in a chained format (linking them with cause field) I've made python side compliant
* Adapted all the tests

## How was this patch tested?

Existing unit tests.

Closes #24403 from gaborgsomogyi/SPARK-23014.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-29 09:44:23 -07:00
Sean Owen 8a17d26784 [SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials
## What changes were proposed in this pull request?

I want to get rid of as much use of `scala.language.existentials` as possible for 3.0. It's a complicated language feature that generates warnings unless this value is imported. It might even be on the way out of Scala: https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785

For Spark, it comes up mostly where the code plays fast and loose with generic types, not the advanced situations you'll often see referenced where this feature is explained. For example, it comes up in cases where a function returns something like `(String, Class[_])`. Scala doesn't like matching this to any other instance of `(String, Class[_])` because doing so requires inferring the existence of some type that satisfies both. Seems obvious if the generic type is a wildcard, but, not technically something Scala likes to let you get away with.

This is a large PR, and it only gets rid of _most_ instances of `scala.language.existentials`. The change should be all compile-time and shouldn't affect APIs or logic.

Many of the changes simply touch up sloppiness about generic types, making the known correct value explicit in the code.

Some fixes involve being more explicit about the existence of generic types in methods. For instance, `def foo(arg: Class[_])` seems innocent enough but should really be declared `def foo[T](arg: Class[T])` to let Scala select and fix a single type when evaluating calls to `foo`.

For kind of surprising reasons, this comes up in places where code evaluates a tuple of things that involve a generic type, but is OK if the two parts of the tuple are evaluated separately.

One key change was altering `Utils.classForName(...): Class[_]` to the more correct `Utils.classForName[T](...): Class[T]`. This caused a number of small but positive changes to callers that otherwise had to cast the result.

In several tests, `Dataset[_]` was used where `DataFrame` seems to be the clear intent.

Finally, in a few cases in MLlib, the return type `this.type` was used where there are no subclasses of the class that uses it. This really isn't needed and causes issues for Scala reasoning about the return type. These are just changed to be concrete classes as return types.

After this change, we have only a few classes that still import `scala.language.existentials` (because modifying them would require extensive rewrites to fix) and no build warnings.

## How was this patch tested?

Existing tests.

Closes #24431 from srowen/SPARK-27536.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-04-29 11:02:01 -05:00
Koert Kuipers 7b367bfc86 [SPARK-27477][BUILD] Kafka token provider should have provided dependency on Spark
## What changes were proposed in this pull request?

Change spark-token-provider-kafka-0-10 dependency on spark-core to be provided

## How was this patch tested?

Ran existing unit tests

Closes #24384 from koertkuipers/feat-kafka-token-provider-fix-deps.

Authored-by: Koert Kuipers <koert@tresata.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-26 11:52:08 -07:00
Wenchen Fan 85fd552ed6 [SPARK-27190][SQL] add table capability for streaming
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/24012 , to add the corresponding capabilities for streaming.

## How was this patch tested?

existing tests

Closes #24129 from cloud-fan/capability.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-26 15:44:23 +08:00
uncleGen d2656aaecd [SPARK-27494][SS] Null values don't work in Kafka source v2
## What changes were proposed in this pull request?

Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values.

## How was this patch tested?

add new unit tests

Closes #24441 from uncleGen/SPARK-27494.

Authored-by: uncleGen <hustyugm@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-26 14:25:31 +08:00
Sean Owen 596a5ff273 [MINOR][BUILD] Update genjavadoc to 0.13
## What changes were proposed in this pull request?

Kind of related to https://github.com/gatorsmile/spark/pull/5 - let's update genjavadoc to see if it generates fewer spurious javadoc errors to begin with.

## How was this patch tested?

Existing docs build

Closes #24443 from srowen/genjavadoc013.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-24 13:44:48 +09:00
Gabor Somogyi 94adffa8b1 [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
## What changes were proposed in this pull request?

`Krb5LoginModule` supports debug parameter which is not yet supported from Spark side. This configuration makes it easier to debug authentication issues against Kafka.

In this PR `Krb5LoginModule` debug flag controlled by either `sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`.

Additionally found some hardcoded values like `ssl.truststore.location`, etc... which could be error prone if Kafka changes it so in such cases Kafka define used.

## How was this patch tested?

Existing + additional unit tests + on cluster.

Closes #24204 from gaborgsomogyi/SPARK-27270.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-11 16:39:40 -07:00
Sean Owen 4ec7f631aa [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
## What changes were proposed in this pull request?

Fix build warnings -- see some details below.

But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway.

## How was this patch tested?

Existing tests.

Closes #24314 from srowen/SPARK-27404.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-04-11 13:43:44 -05:00
gengjiaan 53e31e2ca1 [SPARK-27399][STREAMING][KAFKA] Arrange scattered config and reduce hardcode for kafka 10.
## What changes were proposed in this pull request?

I found a lot scattered config in `Kafka` streaming.I think should arrange these config in unified position.

## How was this patch tested?

No need UT.

Closes #24267 from beliefer/arrange-scattered-streaming-kafka-config.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-04-06 18:05:15 -05:00
Sean Owen d4420b455a [SPARK-27323][CORE][SQL][STREAMING] Use Single-Abstract-Method support in Scala 2.12 to simplify code
## What changes were proposed in this pull request?

Use Single Abstract Method syntax where possible (and minor related cleanup). Comments below. No logic should change here.

## How was this patch tested?

Existing tests.

Closes #24241 from srowen/SPARK-27323.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-04-02 07:37:05 -07:00
Zhu, Lipeng 1f2564d0b0 [SPARK-27155][TEST] Parameterize Oracle docker image name
## What changes were proposed in this pull request?

Update Oracle docker image name.

## How was this patch tested?

./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12

Closes #24086 from lipzhu/SPARK-27155.

Authored-by: Zhu, Lipeng <lipzhu@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-25 15:17:41 -05:00
Sean Owen 8bc304f97e [SPARK-26132][BUILD][CORE] Remove support for Scala 2.11 in Spark 3.0.0
## What changes were proposed in this pull request?

Remove Scala 2.11 support in build files and docs, and in various parts of code that accommodated 2.11. See some targeted comments below.

## How was this patch tested?

Existing tests.

Closes #23098 from srowen/SPARK-26132.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-25 10:46:42 -05:00
Dongjoon Hyun 6ef94e0f18 [SPARK-27260][SS] Upgrade to Kafka 2.2.0
## What changes were proposed in this pull request?

This PR aims to update Kafka dependency to 2.2.0 to bring the following improvement and bug fixes.
- https://issues.apache.org/jira/projects/KAFKA/versions/12344063

Due to [KAFKA-4453](https://issues.apache.org/jira/browse/KAFKA-4453), data plane API and controller plane API are separated. Apache Spark needs the following changes.
```scala
- servers.head.apis.metadataCache
+ servers.head.dataPlaneRequestProcessor.metadataCache
```

## How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #24190 from dongjoon-hyun/SPARK-27260.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-24 17:39:57 -07:00
Dongjoon Hyun 4d5247778a [SPARK-27197][SQL][TEST] Add ReadNestedSchemaTest for file-based data sources
## What changes were proposed in this pull request?

The reader schema is said to be evolved (or projected) when it changed after the data is written by writers. Apache Spark file-based data sources have a test coverage for that; e.g. [ReadSchemaSuite.scala](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala). This PR aims to add a test coverage for nested columns by adding and hiding nested columns.

## How was this patch tested?

Pass the Jenkins with newly added tests.

Closes #24139 from dongjoon-hyun/SPARK-27197.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-03-20 00:22:05 +00:00
Zhu, Lipeng 99c427b1d3 [SPARK-27168][SQL][TEST] Add docker integration test for MsSql server
## What changes were proposed in this pull request?

This PR aims to add a JDBC integration test for MsSql server.

## How was this patch tested?

```
./build/mvn clean install -DskipTests
./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 \
-Dtest=none -DwildcardSuites=org.apache.spark.sql.jdbc.MsSqlServerIntegrationSuite
```

Closes #24099 from lipzhu/SPARK-27168.

Lead-authored-by: Zhu, Lipeng <lipzhu@ebay.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Lipeng Zhu <lipzhu@icloud.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-19 08:43:23 -07:00
Dongjoon Hyun 26e9849cb4 [SPARK-27195][SQL][TEST] Add AvroReadSchemaSuite
## What changes were proposed in this pull request?

The reader schema is said to be evolved (or projected) when it changed after the data is written by writers. Apache Spark file-based data sources have a test coverage for that, [ReadSchemaSuite.scala](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala). This PR aims to add `AvroReadSchemaSuite` to ensure the minimal consistency among file-based data sources and prevent a future regression in Avro data source.

## How was this patch tested?

Pass the Jenkins with the newly added test suite.

Closes #24135 from dongjoon-hyun/SPARK-27195.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-03-18 20:10:30 -07:00
Ryan Blue e348f14259 [SPARK-26811][SQL] Add capabilities to v2.Table
## What changes were proposed in this pull request?

This adds a new method, `capabilities` to `v2.Table` that returns a set of `TableCapability`. Capabilities are used to fail queries during analysis checks, `V2WriteSupportCheck`, when the table does not support operations, like truncation.

## How was this patch tested?

Existing tests for regressions, added new analysis suite, `V2WriteSupportCheckSuite`, for new capability checks.

Closes #24012 from rdblue/SPARK-26811-add-capabilities.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-18 18:25:11 +08:00
DylanGuedes 2fecc4a3fe [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)
## What changes were proposed in this pull request?

To change calls to AdminUtils, currently used to create and delete topics in Kafka tests. With this change, it will rely on adminClient, the recommended way from now on.

## How was this patch tested?
I ran all unit tests and they are fine. Since it is already good tested, I thought that changes in the API wouldn't require new tests, as long as the current tests are working fine.

Closes #24071 from DylanGuedes/spark-27138.

Authored-by: DylanGuedes <djmgguedes@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-14 09:20:30 -05:00
Wenchen Fan 2a80a4cd39 [SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
## What changes were proposed in this pull request?

It's a little awkward to have 2 different classes(`CaseInsensitiveStringMap` and `DataSourceOptions`) to present the options in data source and catalog API.

This PR merges these 2 classes, while keeping the name `CaseInsensitiveStringMap`, which is more precise.

## How was this patch tested?

existing tests

Closes #24025 from cloud-fan/option.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-14 01:23:27 +08:00
Wenchen Fan d3813d8b21 [SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution
## What changes were proposed in this pull request?

According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch.

This PR fixes it.

## How was this patch tested?

existing tests

Closes #23981 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-03-13 19:47:54 +08:00
Gabor Somogyi 98a8725e66 [SPARK-27022][DSTREAMS] Add kafka delegation token support.
## What changes were proposed in this pull request?

It adds Kafka delegation token support for DStreams. Please be aware as Kafka native sink is not available for DStreams this PR contains delegation token usage only on consumer side.

What this PR contains:
* Usage of token through dynamic JAAS configuration
* `KafkaConfigUpdater` moved to `kafka-0-10-token-provider`
* `KafkaSecurityHelper` functionality moved into `KafkaTokenUtil`
* Documentation

## How was this patch tested?

Existing unit tests + on cluster.

Long running Kafka to file tests on 4 node cluster with randomly thrown artificial exceptions.

Test scenario:

* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512

Kafka broker settings:

* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
When token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token picked up correctly.

cd docs/
SKIP_API=1 jekyll build
Manual webpage check.

Closes #23929 from gaborgsomogyi/SPARK-27022.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-03-07 11:36:37 -08:00
Wenchen Fan 382d5a82b0 [SPARK-26956][SS] remove streaming output mode from data source v2 APIs
## What changes were proposed in this pull request?

Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic.

The changes are:
1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data.
2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic.
3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait.

The behavior changes:
1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.

## How was this patch tested?

existing tests

Closes #23859 from cloud-fan/update.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-03-03 22:20:31 -08:00
liuxian 02bbe977ab [MINOR] Remove unnecessary gets when getting a value from map.
## What changes were proposed in this pull request?

Redundant `get`  when getting a value from `Map` given a key.

## How was this patch tested?

N/A

Closes #23901 from 10110346/removegetfrommap.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-01 11:48:07 -06:00
Gabor Somogyi 76e0b6bafb [SPARK-27002][SS] Get kafka delegation tokens right before consumer/producer created
## What changes were proposed in this pull request?

Spark not always picking up the latest Kafka delegation tokens even if a new one properly obtained.
In the PR I'm setting delegation tokens right before `KafkaConsumer` and `KafkaProducer` creation to be on the safe side.

## How was this patch tested?

Long running Kafka to Kafka tests on 4 node cluster with randomly thrown artificial exceptions.

Test scenario:
* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512

Kafka broker settings:
* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
But when token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token not always picked up.

Closes #23906 from gaborgsomogyi/SPARK-27002.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-27 10:07:02 -08:00
liuxian 7912dbb88f [MINOR] Simplify boolean expression
## What changes were proposed in this pull request?

Comparing whether Boolean expression is equal to true is redundant
For example:
The datatype of `a` is boolean.
Before:
if (a == true)
After:
if (a)

## How was this patch tested?
N/A

Closes #23884 from 10110346/simplifyboolean.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-27 08:38:00 -06:00
Wenchen Fan f85ed9a3e5 [SPARK-26785][SQL] data source v2 API refactor: streaming write
## What changes were proposed in this pull request?

Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing).

The major changes:
1. rename `StreamingWriteSupport` to `StreamingWrite`
2. add `WriteBuilder.buildForStreaming`
3. update existing sinks, to move the creation of `StreamingWrite` to `Table`

## How was this patch tested?

existing tests

Closes #23702 from cloud-fan/stream-write.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-02-18 16:17:24 -08:00
Gabor Somogyi 28ced387b9 [SPARK-26772][YARN] Delete ServiceCredentialProvider and make HadoopDelegationTokenProvider a developer API
## What changes were proposed in this pull request?

`HadoopDelegationTokenProvider` has basically the same functionality just like `ServiceCredentialProvider` so the interfaces can be merged.

`YARNHadoopDelegationTokenManager` now loads `ServiceCredentialProvider`s in one step. The drawback of this if one provider fails all others are not loaded. `HadoopDelegationTokenManager` loads `HadoopDelegationTokenProvider`s independently so it provides more robust behaviour.

In this PR I've I've made the following changes:
* Deleted `YARNHadoopDelegationTokenManager` and `ServiceCredentialProvider`
* Made `HadoopDelegationTokenProvider` a `DeveloperApi`

## How was this patch tested?

Existing unit tests.

Closes #23686 from gaborgsomogyi/SPARK-26772.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-02-15 14:43:13 -08:00
Hyukjin Kwon c406472970 [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility
## What changes were proposed in this pull request?

Currently, looks, to use `from_avro` and `to_avro` in Java APIs side,

```java
import static org.apache.spark.sql.avro.package$.MODULE$;

MODULE$.to_avro
MODULE$.from_avro
```

This PR targets to deprecate and move both functions under `avro` package into `functions` object like the way of our `org.apache.spark.sql.functions`.

Therefore, Java side can import:

```java
import static org.apache.spark.sql.avro.functions.*;
```

and Scala side can import:

```scala
import org.apache.spark.sql.avro.functions._
```

## How was this patch tested?

Manually tested, and unit tests for Java APIs were added.

Closes #23784 from HyukjinKwon/SPARK-26870.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-15 10:24:35 +08:00
Maxim Gekk a829234df3 [SPARK-26817][CORE] Use System.nanoTime to measure time intervals
## What changes were proposed in this pull request?

In the PR, I propose to use `System.nanoTime()` instead of `System.currentTimeMillis()` in measurements of time intervals.

`System.currentTimeMillis()` returns current wallclock time and will follow changes to the system clock. Thus, negative wallclock adjustments can cause timeouts to "hang" for a long time (until wallclock time has caught up to its previous value again). This can happen when ntpd does a "step" after the network has been disconnected for some time. The most canonical example is during system bootup when DHCP takes longer than usual. This can lead to failures that are really hard to understand/reproduce. `System.nanoTime()` is guaranteed to be monotonically increasing irrespective of wallclock changes.

## How was this patch tested?

By existing test suites.

Closes #23727 from MaxGekk/system-nanotime.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-13 13:12:16 -06:00