Commit graph

217 commits

Author SHA1 Message Date
Kazuaki Ishizaki f31d9a629b [MINOR][DOC][SQL][CORE] Fix typo in document and comments
### What changes were proposed in this pull request?

Fixed typo in `docs` directory and in other directories

1. Find typo in `docs` and apply fixes to files in all directories
2. Fix `the the` -> `the`

### Why are the changes needed?

Better readability of documents

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

No test needed

Closes #26976 from kiszk/typo_20191221.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-12-21 14:08:58 -08:00
Jungtaek Lim (HeartSaVioR) 8384ff4c9d [SPARK-28144][SPARK-29294][SS] Upgrade Kafka to 2.4.0
### What changes were proposed in this pull request?

This patch upgrades the version of Kafka to 2.4, which supports Scala 2.13.

There're some incompatible changes in Kafka 2.4 which the patch addresses as well:

* `ZkUtils` is removed -> Replaced with `KafkaZkClient`
* Majority of methods are removed in `AdminUtils` -> Replaced with `AdminZkClient`
* Method signature of `Scheduler.schedule` is changed (return type) -> leverage `DeterministicScheduler` to avoid implementing `ScheduledFuture`

### Why are the changes needed?

* Kafka 2.4 supports Scala 2.13

### Does this PR introduce any user-facing change?

No, as Kafka API is known to be compatible across versions.

### How was this patch tested?

Existing UTs

Closes #26960 from HeartSaVioR/SPARK-29294.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-12-21 14:01:25 -08:00
Yuming Wang 696288f623 [INFRA] Reverts commit 56dcd79 and c216ef1
### What changes were proposed in this pull request?
1. Revert "Preparing development version 3.0.1-SNAPSHOT": 56dcd79

2. Revert "Preparing Spark release v3.0.0-preview2-rc2": c216ef1

### Why are the changes needed?
Shouldn't change master.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
manual test:
https://github.com/apache/spark/compare/5de5e46..wangyum:revert-master

Closes #26915 from wangyum/revert-master.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
2019-12-16 19:57:44 -07:00
Yuming Wang 56dcd79992 Preparing development version 3.0.1-SNAPSHOT 2019-12-17 01:57:27 +00:00
Yuming Wang c216ef1d03 Preparing Spark release v3.0.0-preview2-rc2 2019-12-17 01:57:21 +00:00
Jungtaek Lim (HeartSaVioR) 94eb66593a [SPARK-30227][SQL] Add close() on DataWriter interface
### What changes were proposed in this pull request?

This patch adds close() method to the DataWriter interface, which will become the place to cleanup the resource.

### Why are the changes needed?

The lifecycle of DataWriter instance ends at either commit() or abort(). That makes datasource implementors to feel they can place resource cleanup in both sides, but abort() can be called when commit() fails; so they have to ensure they don't do double-cleanup if cleanup is not idempotent.

### Does this PR introduce any user-facing change?

Depends on the definition of user; if they're developers of custom DSv2 source, they have to add close() in their DataWriter implementations. It's OK to just add close() with empty content as they should have already dealt with resource cleanup in commit/abort, but they would love to migrate the resource cleanup logic to close() as it avoids double cleanup. If they're just end users using the provided DSv2 source (regardless of built-in/3rd party), no change.

### How was this patch tested?

Existing tests.

Closes #26855 from HeartSaVioR/SPARK-30227.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-12-13 16:12:41 +08:00
Shixiong Zhu cfd7ca9a06
Revert "[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer"
This reverts commit 3641c3dd69.
2019-12-10 13:38:38 -08:00
angerszhu da27f91560 [SPARK-29957][TEST] Reset MiniKDC's default enctypes to fit jdk8/jdk11
### What changes were proposed in this pull request?

Hadoop jira: https://issues.apache.org/jira/browse/HADOOP-12911
In this jira, the author said to replace origin Apache Directory project which is not maintained (but not said it won't work well in jdk11) to Apache Kerby which is java binding(fit java version).

And in Flink: https://github.com/apache/flink/pull/9622
Author show the reason why hadoop-2.7.2's  `MminiKdc` failed with jdk11.
Because new encryption types of `es128-cts-hmac-sha256-128` and `aes256-cts-hmac-sha384-192` (for Kerberos 5) enabled by default were added in Java 11.
Spark with `hadoop-2.7's MiniKdc`does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure.

And when I test hadoop-2.7.2's minikdc in local, the kerberos 's debug error message is  read message stream failed, message can't match.

### Why are the changes needed?
Support jdk11 with hadoop-2.7

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
Existed UT

Closes #26594 from AngersZhuuuu/minikdc-3.2.0.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-12-05 23:12:45 -08:00
Ximo Guanter 54c5087a3a [SPARK-29248][SQL] provider number of partitions when creating v2 data writer factory
### What changes were proposed in this pull request?
When implementing a ScanBuilder, we require the implementor to provide the schema of the data and the number of partitions.

However, when someone is implementing WriteBuilder we only pass them the schema, but not the number of partitions. This is an asymetrical developer experience.

This PR adds a PhysicalWriteInfo interface that is passed to createBatchWriterFactory and createStreamingWriterFactory that adds the number of partitions of the data that is going to be written.

### Why are the changes needed?
Passing in the number of partitions on the WriteBuilder would enable data sources to provision their write targets before starting to write. For example:

it could be used to provision a Kafka topic with a specific number of partitions
it could be used to scale a microservice prior to sending the data to it
it could be used to create a DsV2 that sends the data to another spark cluster (currently not possible since the reader wouldn't be able to know the number of partitions)
### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Tests passed

Closes #26591 from edrevo/temp.

Authored-by: Ximo Guanter <joaquin.guantergonzalbez@telefonica.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-22 00:19:25 +08:00
Sean Owen 1febd373ea [MINOR][TESTS] Replace JVM assert with JUnit Assert in tests
### What changes were proposed in this pull request?

Use JUnit assertions in tests uniformly, not JVM assert() statements.

### Why are the changes needed?

assert() statements do not produce as useful errors when they fail, and, if they were somehow disabled, would fail to test anything.

### Does this PR introduce any user-facing change?

No. The assertion logic should be identical.

### How was this patch tested?

Existing tests.

Closes #26581 from srowen/assertToJUnit.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-11-20 14:04:15 -06:00
Gabor Somogyi 3641c3dd69 [SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer
### What changes were proposed in this pull request?

Kafka producers are now closed when `spark.kafka.producer.cache.timeout` reached which could be significant problem when processing big SQL queries. The workaround was to increase `spark.kafka.producer.cache.timeout` to a number where the biggest SQL query can be finished.

In this PR I've adapted similar solution which already exists on the consumer side, namely applies Apache Commons Pool on the producer side as well. Main advantages choosing this solution:
* Producers are not closed until they're in use
* No manual reference counting needed (which may be error prone)
* Thread-safe by design
* Provides jmx connection to the pool where metrics can be fetched

What this PR contains:
* Introduced producer side parameters to configure pool
* Renamed `InternalKafkaConsumerPool` to `InternalKafkaConnectorPool` and made it abstract
* Created 2 implementations from it: `InternalKafkaConsumerPool` and `InternalKafkaProducerPool`
* Adapted `CachedKafkaProducer` to use `InternalKafkaProducerPool`
* Changed `KafkaDataWriter` and `KafkaDataWriteTask` to release producer even in failure scenario
* Added several new tests
* Extended `KafkaTest` to clear not only producers but consumers as well
* Renamed `InternalKafkaConsumerPoolSuite` to `InternalKafkaConnectorPoolSuite` where only consumer tests are checking the behavior (please see comment for reasoning)

What this PR not yet contains(but intended when the main concept is stable):
* User facing documentation

### Why are the changes needed?
Kafka producer closed after 10 minutes (with default settings).

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing + additional unit tests.
Cluster tests being started.

Closes #25853 from gaborgsomogyi/SPARK-21869.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-11-07 17:06:32 -08:00
Jungtaek Lim (HeartSaVioR) 252ecd333f [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink
### What changes were proposed in this pull request?

This patch leverages V2 continuous memory stream to extract tests from Kafka micro-batch sink suite and continuous sink suite and deduplicate them. These tests are basically doing the same, except how to run and verify the result.

### Why are the changes needed?

We no longer have same tests spotted on two places - brings 300 lines deletion.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing UTs.

Closes #26292 from HeartSaVioR/SPARK-29635.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-11-06 17:08:42 -08:00
Xingbo Jiang 8207c835b4 Revert "Prepare Spark release v3.0.0-preview-rc2"
This reverts commit 007c873ae3.
2019-10-30 17:45:44 -07:00
Xingbo Jiang 007c873ae3 Prepare Spark release v3.0.0-preview-rc2
### What changes were proposed in this pull request?

To push the built jars to maven release repository, we need to remove the 'SNAPSHOT' tag from the version name.

Made the following changes in this PR:
* Update all the `3.0.0-SNAPSHOT` version name to `3.0.0-preview`
* Update the sparkR version number check logic to allow jvm version like `3.0.0-preview`

**Please note those changes were generated by the release script in the past, but this time since we manually add tags on master branch, we need to manually apply those changes too.**

We shall revert the changes after 3.0.0-preview release passed.

### Why are the changes needed?

To make the maven release repository to accept the built jars.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

N/A
2019-10-30 17:42:59 -07:00
Xingbo Jiang b33a58c0c6 Revert "Prepare Spark release v3.0.0-preview-rc1"
This reverts commit 5eddbb5f1d.
2019-10-28 22:32:34 -07:00
Xingbo Jiang 5eddbb5f1d Prepare Spark release v3.0.0-preview-rc1
### What changes were proposed in this pull request?

To push the built jars to maven release repository, we need to remove the 'SNAPSHOT' tag from the version name.

Made the following changes in this PR:
* Update all the `3.0.0-SNAPSHOT` version name to `3.0.0-preview`
* Update the PySpark version from `3.0.0.dev0` to `3.0.0`

**Please note those changes were generated by the release script in the past, but this time since we manually add tags on master branch, we need to manually apply those changes too.**

We shall revert the changes after 3.0.0-preview release passed.

### Why are the changes needed?

To make the maven release repository to accept the built jars.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

N/A

Closes #26243 from jiangxb1987/3.0.0-preview-prepare.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
2019-10-28 22:31:29 -07:00
Jungtaek Lim (HeartSaVioR) 762db39c15 [SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source
### What changes were proposed in this pull request?

This patch deduplicates code blocks in Kafka data source which are being repeated multiple times in a method.

### Why are the changes needed?

This change would simplify the code and open possibility to simplify future code whenever fields are added to Kafka writer schema.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing UTs.

Closes #26158 from HeartSaVioR/MINOR-deduplicate-kafka-source.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-28 11:14:18 -07:00
Gabor Somogyi 25493919f8 [SPARK-29580][TESTS] Add kerberos debug messages for Kafka secure tests
### What changes were proposed in this pull request?
`org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite` failed lately. After had a look at the logs it just shows the following fact without any details:
```
Caused by: sbt.ForkMain$ForkError: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - Server not found in Kerberos database
```
Since the issue is intermittent and not able to reproduce it we should add more debug information and wait for reproduction with the extended logs.

### Why are the changes needed?
Failing test doesn't give enough debug information.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
I've started the test manually and checked that such additional debug messages show up:
```
>>> KrbApReq: APOptions are 00000000 00000000 00000000 00000000
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Looking for keys for: kafka/localhostEXAMPLE.COM
Added key: 17version: 0
Added key: 23version: 0
Added key: 16version: 0
Found unsupported keytype (3) for kafka/localhostEXAMPLE.COM
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Using builtin default etypes for permitted_enctypes
default etypes for permitted_enctypes: 17 16 23.
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
MemoryCache: add 1571936500/174770/16C565221B70AAB2BEFE31A83D13A2F4/client/localhostEXAMPLE.COM to client/localhostEXAMPLE.COM|kafka/localhostEXAMPLE.COM
MemoryCache: Existing AuthList:
#3: 1571936493/200803/8CD70D280B0862C5DA1FF901ECAD39FE/client/localhostEXAMPLE.COM
#2: 1571936499/985009/BAD33290D079DD4E3579A8686EC326B7/client/localhostEXAMPLE.COM
#1: 1571936499/995208/B76B9D78A9BE283AC78340157107FD40/client/localhostEXAMPLE.COM
```

Closes #26252 from gaborgsomogyi/SPARK-29580.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-25 14:11:35 -07:00
redsk 8bd8f492ea [SPARK-29500][SQL][SS] Support partition column when writing to Kafka
### What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-29500

`KafkaRowWriter` now supports setting the Kafka partition by reading a "partition" column in the input dataframe.

Code changes in commit nr. 1.
Test changes in commit nr. 2.
Doc changes in commit nr. 3.

tcondie dongjinleekr srowen

### Why are the changes needed?
While it is possible to configure a custom Kafka Partitioner with
`.option("kafka.partitioner.class", "my.custom.Partitioner")`, this is not enough for certain use cases. See the Jira issue.

### Does this PR introduce any user-facing change?
No, as this behaviour is optional.

### How was this patch tested?
Two new UT were added and one was updated.

Closes #26153 from redsk/feature/SPARK-29500.

Authored-by: redsk <nicola.bova@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-25 08:06:36 -05:00
Sean Owen 2d871ad0e7 [SPARK-29392][CORE][SQL][STREAMING] Remove symbol literal syntax 'foo, deprecated in Scala 2.13, in favor of Symbol("foo")
### What changes were proposed in this pull request?

Syntax like `'foo` is deprecated in Scala 2.13. Replace usages with `Symbol("foo")`

### Why are the changes needed?

Avoids ~50 deprecation warnings when attempting to build with 2.13.

### Does this PR introduce any user-facing change?

None, should be no functional change at all.

### How was this patch tested?

Existing tests.

Closes #26061 from srowen/SPARK-29392.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-08 20:15:37 -07:00
Gabor Somogyi 6b5e0e2469 [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
### What changes were proposed in this pull request?
Kafka consumers are cached. If delegation token is used and the token is expired, then exception is thrown. Such case new consumer is created in a Task retry with the latest delegation token. This can be enhanced by detecting the existence of a new delegation token. In this PR I'm detecting whether the token in the consumer is the same as the latest stored in the `UGI` (`targetServersRegex` must match not to create a consumer with another cluster's token).

### Why are the changes needed?
It would be good to avoid Task retry to pick up the latest delegation token.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing + new unit tests.
Additionally executed the following code snippet to measure `ensureConsumerHasLatestToken` time consumption:
```
    val startTimeNs = System.nanoTime()
    for (i <- 0 until 10000) {
      consumer.ensureConsumerHasLatestToken()
    }
    logInfo(s"It took ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" +
      " to call ensureConsumerHasLatestToken 10000 times")
```

And here are the results:
```
19/09/11 14:58:22 INFO KafkaDataConsumerSuite: It took 1058 ms to call ensureConsumerHasLatestToken 10000 times
...
19/09/11 14:58:23 INFO KafkaDataConsumerSuite: It took 780 ms to call ensureConsumerHasLatestToken 10000 times
...
19/09/11 15:12:11 INFO KafkaDataConsumerSuite: It took 1032 ms to call ensureConsumerHasLatestToken 10000 times
...
19/09/11 15:12:11 INFO KafkaDataConsumerSuite: It took 679 ms to call ensureConsumerHasLatestToken 10000 times
```

Closes #25760 from gaborgsomogyi/SPARK-29054.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-03 09:34:31 -07:00
Sean Owen e1ea806b30 [SPARK-29291][CORE][SQL][STREAMING][MLLIB] Change procedure-like declaration to function + Unit for 2.13
### What changes were proposed in this pull request?

Scala 2.13 emits a deprecation warning for procedure-like declarations:

```
def foo() {
 ...
```

This is equivalent to the following, so should be changed to avoid a warning:

```
def foo(): Unit = {
  ...
```

### Why are the changes needed?

It will avoid about a thousand compiler warnings when we start to support Scala 2.13. I wanted to make the change in 3.0 as there are less likely to be back-ports from 3.0 to 2.4 than 3.1 to 3.0, for example, minimizing that downside to touching so many files.

Unfortunately, that makes this quite a big change.

### Does this PR introduce any user-facing change?

No behavior change at all.

### How was this patch tested?

Existing tests.

Closes #25968 from srowen/SPARK-29291.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-30 10:03:23 -07:00
Burak Yavuz c8159c7941 [SPARK-29197][SQL] Remove saveModeForDSV2 from DataFrameWriter
### What changes were proposed in this pull request?

It is very confusing that the default save mode is different between the internal implementation of a Data source. The reason that we had to have saveModeForDSV2 was that there was no easy way to check the existence of a Table in DataSource v2. Now, we have catalogs for that. Therefore we should be able to remove the different save modes. We also have a plan forward for `save`, where we can't really check the existence of a table, and therefore create one. That will come in a future PR.

### Why are the changes needed?

Because it is confusing that the internal implementation of a data source (which is generally non-obvious to users) decides which default save mode is used within Spark.

### Does this PR introduce any user-facing change?

It changes the default save mode for V2 Tables in the DataFrameWriter APIs

### How was this patch tested?

Existing tests

Closes #25876 from brkyvz/removeSM.

Lead-authored-by: Burak Yavuz <brkyvz@gmail.com>
Co-authored-by: Burak Yavuz <burak@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-09-26 15:20:04 +08:00
Jungtaek Lim (HeartSaVioR) 4513f1c0dc [SPARK-26848][SQL][SS] Introduce new option to Kafka source: offset by timestamp (starting/ending)
## What changes were proposed in this pull request?

This patch introduces new options "startingOffsetsByTimestamp" and "endingOffsetsByTimestamp" to set specific timestamp per topic (since we're unlikely to set the different value per partition) to let source starts reading from offsets which have equal of greater timestamp, and ends reading until offsets which have equal of greater timestamp.

The new option would be optional of course, and take preference over existing offset options.

## How was this patch tested?

New unit tests added. Also manually tested basic functionality with Kafka 2.0.0 server.

Running query below

```
val df = spark.read.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "spark_26848_test_v1,spark_26848_test_2_v1")
  .option("startingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669142193, "spark_26848_test_2_v1": 1549669240965}""")
  .option("endingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669265676, "spark_26848_test_2_v1": 1549699265676}""")
  .load().selectExpr("CAST(value AS STRING)")

df.show()
```

with below records (one string which number part remarks when they're put after such timestamp) in

topic `spark_26848_test_v1`
```
hello1 1549669142193
world1 1549669142193
hellow1 1549669240965
world1 1549669240965
hello1 1549669265676
world1 1549669265676
```

topic `spark_26848_test_2_v1`

```
hello2 1549669142193
world2 1549669142193
hello2 1549669240965
world2 1549669240965
hello2 1549669265676
world2 1549669265676
```

the result of `df.show()` follows:
```
+--------------------+
|               value|
+--------------------+
|world1 1549669240965|
|world1 1549669142193|
|world2 1549669240965|
|hello2 1549669240965|
|hellow1 154966924...|
|hello2 1549669265676|
|hello1 1549669142193|
|world2 1549669265676|
+--------------------+
```

Note that endingOffsets (as well as endingOffsetsByTimestamp) are exclusive.

Closes #23747 from HeartSaVioR/SPARK-26848.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-23 19:25:36 -05:00
Gabor Somogyi 71e7516132 [SPARK-29027][TESTS] KafkaDelegationTokenSuite fix when loopback canonical host name differs from localhost
### What changes were proposed in this pull request?
`KafkaDelegationTokenSuite` fails on different platforms with the following problem:
```
19/09/11 11:07:42.690 pool-1-thread-1-SendThread(localhost:44965) DEBUG ZooKeeperSaslClient: creating sasl client: Client=zkclient/localhostEXAMPLE.COM;service=zookeeper;serviceHostname=localhost.localdomain
...
NIOServerCxn.Factory:localhost/127.0.0.1:0: Zookeeper Server failed to create a SaslServer to interact with a client during session initiation:
javax.security.sasl.SaslException: Failure to initialize security context [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos credentails)]
	at com.sun.security.sasl.gsskerb.GssKrb5Server.<init>(GssKrb5Server.java:125)
	at com.sun.security.sasl.gsskerb.FactoryImpl.createSaslServer(FactoryImpl.java:85)
	at javax.security.sasl.Sasl.createSaslServer(Sasl.java:524)
	at org.apache.zookeeper.util.SecurityUtils$2.run(SecurityUtils.java:233)
	at org.apache.zookeeper.util.SecurityUtils$2.run(SecurityUtils.java:229)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.zookeeper.util.SecurityUtils.createSaslServer(SecurityUtils.java:228)
	at org.apache.zookeeper.server.ZooKeeperSaslServer.createSaslServer(ZooKeeperSaslServer.java:44)
	at org.apache.zookeeper.server.ZooKeeperSaslServer.<init>(ZooKeeperSaslServer.java:38)
	at org.apache.zookeeper.server.NIOServerCnxn.<init>(NIOServerCnxn.java:100)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.createConnection(NIOServerCnxnFactory.java:186)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:227)
	at java.lang.Thread.run(Thread.java:748)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos credentails)
	at sun.security.jgss.krb5.Krb5AcceptCredential.getInstance(Krb5AcceptCredential.java:87)
	at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:127)
	at sun.security.jgss.GSSManagerImpl.getCredentialElement(GSSManagerImpl.java:193)
	at sun.security.jgss.GSSCredentialImpl.add(GSSCredentialImpl.java:427)
	at sun.security.jgss.GSSCredentialImpl.<init>(GSSCredentialImpl.java:62)
	at sun.security.jgss.GSSManagerImpl.createCredential(GSSManagerImpl.java:154)
	at com.sun.security.sasl.gsskerb.GssKrb5Server.<init>(GssKrb5Server.java:108)
	... 13 more
NIOServerCxn.Factory:localhost/127.0.0.1:0: Client attempting to establish new session at /127.0.0.1:33742
SyncThread:0: Creating new log file: log.1
SyncThread:0: Established session 0x100003736ae0000 with negotiated timeout 10000 for client /127.0.0.1:33742
pool-1-thread-1-SendThread(localhost:35625): Session establishment complete on server localhost/127.0.0.1:35625, sessionid = 0x100003736ae0000, negotiated timeout = 10000
pool-1-thread-1-SendThread(localhost:35625): ClientCnxn:sendSaslPacket:length=0
pool-1-thread-1-SendThread(localhost:35625): saslClient.evaluateChallenge(len=0)
pool-1-thread-1-EventThread: zookeeper state changed (SyncConnected)
NioProcessor-1: No server entry found for kerberos principal name zookeeper/localhost.localdomainEXAMPLE.COM
NioProcessor-1: No server entry found for kerberos principal name zookeeper/localhost.localdomainEXAMPLE.COM
NioProcessor-1: Server not found in Kerberos database (7)
NioProcessor-1: Server not found in Kerberos database (7)
```

The problem reproducible if the `localhost` and `localhost.localdomain` order exhanged:
```
[systestgsomogyi-build spark]$ cat /etc/hosts
127.0.0.1   localhost.localdomain localhost localhost4 localhost4.localdomain4
::1         localhost.localdomain localhost localhost6 localhost6.localdomain6
```

The main problem is that `ZkClient` connects to the canonical loopback address (which is not necessarily `localhost`).

### Why are the changes needed?
`KafkaDelegationTokenSuite` failed in some environments.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing unit tests on different platforms.

Closes #25803 from gaborgsomogyi/SPARK-29027.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-17 15:30:18 -07:00
Jungtaek Lim (HeartSaVioR) 88c8d5eed2 [SPARK-23539][SS][FOLLOWUP][TESTS] Add UT to ensure existing query doesn't break with default conf of includeHeaders
### What changes were proposed in this pull request?

This patch adds new UT to ensure existing query (before Spark 3.0.0) with checkpoint doesn't break with default configuration of "includeHeaders" being introduced via SPARK-23539.

This patch also modifies existing test which checks type of columns to also check headers column as well.

### Why are the changes needed?

The patch adds missing tests which guarantees backward compatibility of the change of SPARK-23539.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

UT passed.

Closes #25792 from HeartSaVioR/SPARK-23539-FOLLOWUP.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-16 15:22:04 -05:00
Lee Dongjin 1675d5114e [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
## What changes were proposed in this pull request?

This update adds support for Kafka Headers functionality in Structured Streaming.

## How was this patch tested?

With following unit tests:

- KafkaRelationSuite: "default starting and ending offsets with headers" (new)
- KafkaSinkSuite: "batch - write to kafka" (updated)

Closes #22282 from dongjinleekr/feature/SPARK-23539.

Lead-authored-by: Lee Dongjin <dongjin@apache.org>
Co-authored-by: Jungtaek Lim <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-13 12:31:28 -05:00
Wenchen Fan 053dd858d3 [SPARK-28998][SQL] reorganize the packages of DS v2 interfaces/classes
### What changes were proposed in this pull request?

reorganize the packages of DS v2 interfaces/classes:
1. `org.spark.sql.connector.catalog`: put `TableCatalog`, `Table` and other related interfaces/classes
2. `org.spark.sql.connector.expression`: put `Expression`, `Transform` and other related interfaces/classes
3. `org.spark.sql.connector.read`: put `ScanBuilder`, `Scan` and other related interfaces/classes
4. `org.spark.sql.connector.write`: put `WriteBuilder`, `BatchWrite` and other related interfaces/classes

### Why are the changes needed?

Data Source V2 has evolved a lot. It's a bit weird that `Expression` is in `org.spark.sql.catalog.v2` and `Table` is in `org.spark.sql.sources.v2`.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

existing tests

Closes #25700 from cloud-fan/package.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-09-12 19:59:34 +08:00
Gabor Somogyi e516f7e09e [SPARK-28928][SS] Use Kafka delegation token protocol on sources/sinks
### What changes were proposed in this pull request?
At the moment there are 3 places where communication protocol with Kafka cluster has to be set when delegation token used:
* On delegation token
* On source
* On sink

Most of the time users are using the same protocol on all these places (within one Kafka cluster). It would be better to declare it in one place (delegation token side) and Kafka sources/sinks can take this config over.

In this PR I've I've modified the code in a way that Kafka sources/sinks are taking over delegation token side `security.protocol` configuration when the token and the source/sink matches in `bootstrap.servers` configuration. This default configuration can be overwritten on each source/sink independently by using `kafka.security.protocol` configuration.

### Why are the changes needed?
The actual configuration's default behavior represents the minority of the use-cases and inconvenient.

### Does this PR introduce any user-facing change?
Yes, with this change users need to provide less configuration parameters by default.

### How was this patch tested?
Existing + additional unit tests.

Closes #25631 from gaborgsomogyi/SPARK-28928.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-09 15:41:51 -07:00
Jungtaek Lim (HeartSaVioR) 594c9c5a3e [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer
## What changes were proposed in this pull request?

This patch does pooling for both kafka consumers as well as fetched data. The overall benefits of the patch are following:

* Both pools support eviction on idle objects, which will help closing invalid idle objects which topic or partition are no longer be assigned to any tasks.
* It also enables applying different policies on pool, which helps optimization of pooling for each pool.
* We concerned about multiple tasks pointing same topic partition as well as same group id, and existing code can't handle this hence excess seek and fetch could happen. This patch properly handles the case.
* It also makes the code always safe to leverage cache, hence no need to maintain reuseCache parameter.

Moreover, pooling kafka consumers is implemented based on Apache Commons Pool, which also gives couple of benefits:

* We can get rid of synchronization of KafkaDataConsumer object while acquiring and returning InternalKafkaConsumer.
* We can extract the feature of object pool to outside of the class, so that the behaviors of the pool can be tested easily.
* We can get various statistics for the object pool, and also be able to enable JMX for the pool.

FetchedData instances are pooled by custom implementation of pool instead of leveraging Apache Commons Pool, because they have CacheKey as first key and "desired offset" as second key which "desired offset" is changing - I haven't found any general pool implementations supporting this.

This patch brings additional dependency, Apache Commons Pool 2.6.0 into `spark-sql-kafka-0-10` module.

## How was this patch tested?

Existing unit tests as well as new tests for object pool.

Also did some experiment regarding proving concurrent access of consumers for same topic partition.

* Made change on both sides (master and patch) to log when creating Kafka consumer or fetching records from Kafka is happening.
* branches
  * master: https://github.com/HeartSaVioR/spark/tree/SPARK-25151-master-ref-debugging
  * patch: https://github.com/HeartSaVioR/spark/tree/SPARK-25151-debugging
* Test query (doing self-join)
  * https://gist.github.com/HeartSaVioR/d831974c3f25c02846f4b15b8d232cc2
* Ran query from spark-shell, with using `local[*]` to maximize the chance to have concurrent access
* Collected the count of fetch requests on Kafka via command: `grep "creating new Kafka consumer" logfile | wc -l`
* Collected the count of creating Kafka consumers via command: `grep "fetching data from Kafka consumer" logfile | wc -l`

Topic and data distribution is follow:

```
truck_speed_events_stream_spark_25151_v1:0:99440
truck_speed_events_stream_spark_25151_v1:1:99489
truck_speed_events_stream_spark_25151_v1:2:397759
truck_speed_events_stream_spark_25151_v1:3:198917
truck_speed_events_stream_spark_25151_v1:4:99484
truck_speed_events_stream_spark_25151_v1:5:497320
truck_speed_events_stream_spark_25151_v1:6:99430
truck_speed_events_stream_spark_25151_v1:7:397887
truck_speed_events_stream_spark_25151_v1:8:397813
truck_speed_events_stream_spark_25151_v1:9:0
```

The experiment only used smallest 4 partitions (0, 1, 4, 6) from these partitions to finish the query earlier.

The result of experiment is below:

branch | create Kafka consumer | fetch request
-- | -- | --
master | 1986 | 2837
patch | 8 | 1706

Closes #22138 from HeartSaVioR/SPARK-25151.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: Jungtaek Lim <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-04 10:17:38 -07:00
Gabor Somogyi 7d72c073dd [SPARK-28760][SS][TESTS] Add Kafka delegation token end-to-end test with mini KDC
### What changes were proposed in this pull request?
At the moment no end-to-end Kafka delegation token test exists which was mainly because of missing embedded KDC. KDC is missing in general from the testing side so I've discovered what kind of possibilities are there. The most obvious choice is the MiniKDC inside the Hadoop library where Apache Kerby runs in the background. What this PR contains:
* Added MiniKDC as test dependency from Hadoop
* Added `maven-bundle-plugin` because couple of dependencies are coming in bundle format
* Added security mode to `KafkaTestUtils`. Namely start KDC -> start Zookeeper in secure mode -> start Kafka in secure mode
* Added a roundtrip test (saves and reads back data from Kafka)

### Why are the changes needed?
No such test exists + security testing with KDC is completely missing.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing + additional unit tests.
I've put the additional test into a loop and was consuming ~10 sec average.

Closes #25477 from gaborgsomogyi/SPARK-28760.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-29 11:52:35 -07:00
Wenchen Fan cb06209fc9 [SPARK-28747][SQL] merge the two data source v2 fallback configs
## What changes were proposed in this pull request?

Currently we have 2 configs to specify which v2 sources should fallback to v1 code path. One config for read path, and one config for write path.

However, I found it's awkward to work with these 2 configs:
1. for `CREATE TABLE USING format`, should this be read path or write path?
2. for `V2SessionCatalog.loadTable`,  we need to return `UnresolvedTable` if it's a DS v1 or we need to fallback to v1 code path. However, at that time, we don't know if the returned table will be used for read or write.

We don't have any new features or perf improvement in file source v2. The fallback API is just a safeguard if we have bugs in v2 implementations. There are not many benefits to support falling back to v1 for read and write path separately.

This PR proposes to merge these 2 configs into one.

## How was this patch tested?

existing tests

Closes #25465 from cloud-fan/merge-conf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-27 20:47:24 +08:00
Jungtaek Lim (HeartSaVioR) 64032cb01f [MINOR][SS] Reuse KafkaSourceInitialOffsetWriter to deduplicate
### What changes were proposed in this pull request?

This patch proposes to reuse KafkaSourceInitialOffsetWriter to remove identical code in KafkaSource.

Credit to jaceklaskowski for finding this.
https://lists.apache.org/thread.html/7faa6ac29d871444eaeccefc520e3543a77f4362af4bb0f12a3f7cb2%3Cdev.spark.apache.org%3E

### Why are the changes needed?

The code is duplicated with identical code, which opens the chance to maintain the code separately and might end up with bugs not addressed one side.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing UTs, as it's simple refactor.

Closes #25583 from HeartSaVioR/MINOR-SS-reuse-KafkaSourceInitialOffsetWriter.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-26 18:06:18 -07:00
Gabor Somogyi b205269ae0 [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new consumer used
### What changes were proposed in this pull request?
When Task retry happens with Kafka source then it's not known whether the consumer is the issue so the old consumer removed from cache and new consumer created. The feature works fine but not covered with tests.

In this PR I've added such test for DStreams + Structured Streaming.

### Why are the changes needed?
No such tests are there.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing + new unit tests.

Closes #25582 from gaborgsomogyi/SPARK-28875.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-26 13:12:14 -07:00
Wenchen Fan 97dc4c0bfc [SPARK-28744][SQL][TEST] rename SharedSQLContext to SharedSparkSession
## What changes were proposed in this pull request?

The Spark SQL test framework needs to support 2 kinds of tests:
1. tests inside Spark to test Spark itself (extends `SparkFunSuite`)
2. test outside of Spark to test Spark applications (introduced at b57ed2245c)

The class hierarchy of the major testing traits:
![image](https://user-images.githubusercontent.com/3182036/63088526-c0f0af80-bf87-11e9-9bed-c144c2486da9.png)

`PlanTestBase`, `SQLTestUtilsBase` and `SharedSparkSession` intentionally don't extend `SparkFunSuite`, so that they can be used for tests outside of Spark. Tests in Spark should extends `QueryTest` and/or `SharedSQLContext` in most cases.

However, the name is a little confusing. As a result, some test suites extend `SharedSparkSession` instead of `SharedSQLContext`. `SharedSparkSession` doesn't work well with `SparkFunSuite` as it doesn't have the special handling of thread auditing in `SharedSQLContext`. For example, you will see a warning starting with `===== POSSIBLE THREAD LEAK IN SUITE` when you run `DataFrameSelfJoinSuite`.

This PR proposes to rename `SharedSparkSession` to `SharedSparkSessionBase`, and rename `SharedSQLContext` to `SharedSparkSession`.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25463 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-19 19:01:56 +08:00
Gabor Somogyi a493031e2e [SPARK-28695][SS] Use CaseInsensitiveMap in KafkaSourceProvider to make source param handling more robust
## What changes were proposed in this pull request?

[SPARK-28163](https://issues.apache.org/jira/browse/SPARK-28163) fixed a bug and during the analysis we've concluded it would be more robust to use `CaseInsensitiveMap` inside Kafka source. This case less lower/upper case problem would rise in the future.

Please note this PR doesn't intend to solve any kind of actual problem but finish the concept added in [SPARK-28163](https://issues.apache.org/jira/browse/SPARK-28163) (in a fix PR I didn't want to add too invasive changes). In this PR I've changed `Map[String, String]` to `CaseInsensitiveMap[String]` to enforce the usage. These are the main use-cases:
* `contains` => `CaseInsensitiveMap` solves it
* `get...` => `CaseInsensitiveMap` solves it
* `filter` => keys must be converted to lowercase because there is no guarantee that the incoming map has such key set
* `find` => keys must be converted to lowercase because there is no guarantee that the incoming map has such key set
* passing parameters to Kafka consumer/producer => keys must be converted to lowercase because there is no guarantee that the incoming map has such key set

## How was this patch tested?

Existing unit tests.

Closes #25418 from gaborgsomogyi/SPARK-28695.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-15 14:43:52 +08:00
Gabor Somogyi 5663386f4b [SPARK-28163][SS] Use CaseInsensitiveMap for KafkaOffsetReader
## What changes were proposed in this pull request?

There are "unsafe" conversions in the Kafka connector.
`CaseInsensitiveStringMap` comes in which is then converted the following way:
```
...
options.asScala.toMap
...
```
The main problem with this is that such case it looses its case insensitive nature
(case insensitive map is converting the key to lower case when get/contains called).

In this PR I'm using `CaseInsensitiveMap` to solve this problem.

## How was this patch tested?

Existing + additional unit tests.

Closes #24967 from gaborgsomogyi/SPARK-28163.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-09 17:08:11 +08:00
Shixiong Zhu b9c2521de2 [SPARK-28489][SS] Fix a bug that KafkaOffsetRangeCalculator.getRanges may drop offsets
## What changes were proposed in this pull request?

`KafkaOffsetRangeCalculator.getRanges` may drop offsets due to round off errors. The test added in this PR is one example.

This PR rewrites the logic in `KafkaOffsetRangeCalculator.getRanges` to ensure it never drops offsets.

## How was this patch tested?

The regression test.

Closes #25237 from zsxwing/fix-range.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-26 00:10:56 -07:00
Jungtaek Lim (HeartSaVioR) 7548a8826d [SPARK-28199][SS] Move Trigger implementations to Triggers.scala and avoid exposing these to the end users
## What changes were proposed in this pull request?

This patch proposes moving all Trigger implementations to `Triggers.scala`, to avoid exposing these implementations to the end users and let end users only deal with `Trigger.xxx` static methods. This fits the intention of deprecation of `ProcessingTIme`, and we agree to move others without deprecation as this patch will be shipped in major version (Spark 3.0.0).

## How was this patch tested?

UTs modified to work with newly introduced class.

Closes #24996 from HeartSaVioR/SPARK-28199.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-14 14:46:01 -05:00
wangguangxin.cn 42b80ae128 [SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in SQL
## What changes were proposed in this pull request?

There are some hardcoded configs, using config entry to replace them.

## How was this patch tested?

Existing UT

Closes #25059 from WangGuangxin/ConfigEntry.

Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-11 22:36:07 -07:00
Jungtaek Lim (HeartSaVioR) 4212a30883 [SPARK-28142][SS][TEST][FOLLOWUP] Add configuration check test on Kafka continuous stream
## What changes were proposed in this pull request?

This patch adds missing UT which tests the changed behavior of original patch #24942.

## How was this patch tested?

Newly added UT.

Closes #24999 from HeartSaVioR/SPARK-28142-FOLLOWUP.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-03 21:58:35 -07:00
Gabor Somogyi a006c85077 [SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
## What changes were proposed in this pull request?

According to the documentation `groupIdPrefix` should be available for `streaming and batch`.
It is not the case because the batch part is missing.

In this PR I've added:
* Structured Streaming test for v1 and v2 to cover `groupIdPrefix`
* Batch test for v1 and v2 to cover `groupIdPrefix`
* Added `groupIdPrefix` usage in batch

## How was this patch tested?

Additional + existing unit tests.

Closes #25030 from gaborgsomogyi/SPARK-28232.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-02 20:37:52 +08:00
Gabor Somogyi 0a4f985ca0 [SPARK-23098][SQL] Migrate Kafka Batch source to v2.
## What changes were proposed in this pull request?

Kafka batch data source is using v1 at the moment. In the PR I've migrated to v2. Majority of the change is moving code.

What this PR contains:
* useV1Sources usage fixed in `DataFrameReader` and `DataFrameWriter`
* `KafkaBatch` added to handle DSv2 batch reading
* `KafkaBatchWrite` added to handle DSv2 batch writing
* `KafkaBatchPartitionReader` extracted to share between batch and microbatch
* `KafkaDataWriter` extracted to share between batch, microbatch and continuous
* Batch related source/sink tests are now executing on v1 and v2 connectors
* Couple of classes hidden now, functions moved + couple of minor fixes

## How was this patch tested?

Existing + added unit tests.

Closes #24738 from gaborgsomogyi/SPARK-23098.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-02 09:47:30 +08:00
Jungtaek Lim (HeartSaVioR) 85e95b7d27 [SPARK-28142][SS] Use CaseInsensitiveStringMap for KafkaContinuousStream
## What changes were proposed in this pull request?

This patch addresses a missing spot which Map should be passed as CaseInsensitiveStringMap - KafkaContinuousStream seems to be only the missed one.

Before this fix, it has a relevant bug where `pollTimeoutMs` is always set to default value, as the value of `KafkaSourceProvider.CONSUMER_POLL_TIMEOUT` is `kafkaConsumer.pollTimeoutMs` which key-lowercased map has been provided as `sourceOptions`.

## How was this patch tested?

N/A.

Closes #24942 from HeartSaVioR/MINOR-use-case-insensitive-map-for-kafka-continuous-source.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-24 22:20:32 +09:00
Gabor Somogyi 911fadf33a [SPARK-27748][SS] Kafka consumer/producer password/token redaction.
## What changes were proposed in this pull request?

Kafka parameters are logged at several places and the following parameters has to be redacted:
* Delegation token
* `ssl.truststore.password`
* `ssl.keystore.password`
* `ssl.key.password`

This PR contains:
* Spark central redaction framework used to redact passwords (`spark.redaction.regex`)
* Custom redaction added to handle `sasl.jaas.config` (delegation token)
* Redaction code added into consumer/producer code
* Test refactor

## How was this patch tested?

Existing + additional unit tests.

Closes #24627 from gaborgsomogyi/SPARK-27748.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-06-03 15:43:08 -07:00
Gabor Somogyi efa303581a [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
## What changes were proposed in this pull request?

Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`.

Since Kafka consumer caching is not documented I've added this also.

## How was this patch tested?

Existing + added unit test.

```
cd docs
SKIP_API=1 jekyll build
```
and manual webpage check.

Closes #24590 from gaborgsomogyi/SPARK-27687.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-15 10:42:09 -07:00
hehuiyuan 5a8aad01c2 [SPARK-27343][KAFKA][SS] Avoid hardcoded for spark-sql-kafka-0-10
## What changes were proposed in this pull request?

[SPARK-27343](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27343)

Based on the previous PR: https://github.com/apache/spark/pull/24270

Extracting parameters , building the objects of ConfigEntry.

For example:
for the parameter "spark.kafka.producer.cache.timeout",we build
```
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
    ConfigBuilder("spark.kafka.producer.cache.timeout")
      .doc("The expire time to remove the unused producers.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10m")
```

Closes #24574 from hehuiyuan/hehuiyuan-patch-9.

Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-12 10:46:18 -05:00
Wenchen Fan bae5baae52 [SPARK-27642][SS] make v1 offset extends v2 offset
## What changes were proposed in this pull request?

To move DS v2 to the catalyst module, we can't make v2 offset rely on v1 offset, as v1 offset is in sql/core.

## How was this patch tested?

existing tests

Closes #24538 from cloud-fan/offset.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-05-07 23:03:15 -07:00
gengjiaan 8329e7debd [SPARK-27649][SS] Unify the way use 'spark.network.timeout'
## What changes were proposed in this pull request?

For historical reasons, structured streaming still has some old way of use
`spark.network.timeout`
, even though
`org.apache.spark.internal.config.Network.NETWORK_TIMEOUT`
is now available.

## How was this patch tested?
Exists UT.

Closes #24545 from beliefer/unify-spark-network-timeout.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-08 11:43:03 +08:00
Gabor Somogyi 2f55809425 [SPARK-27294][SS] Add multi-cluster Kafka delegation token
## What changes were proposed in this pull request?

The actual implementation doesn't support multi-cluster Kafka connection with delegation token. In this PR I've added this functionality.

What this PR contains:
* New way of configuration
* Multiple delegation token obtain/store/use functionality
* Documentation
* The change works on DStreams also

## How was this patch tested?

Existing + additional unit tests.
Additionally tested on cluster.

Test scenario:

* 2 * 4 node clusters
* The 4-4 nodes are in different kerberos realms
* Cross-Realm trust between the 2 realms
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512
* Artificial exceptions during processing
* Source reads from realm1 sink writes to realm2

Kafka broker settings:

* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

Closes #24305 from gaborgsomogyi/SPARK-27294.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-07 11:40:43 -07:00