## What changes were proposed in this pull request?
To change calls to AdminUtils, currently used to create and delete topics in Kafka tests. With this change, it will rely on adminClient, the recommended way from now on.
## How was this patch tested?
I ran all unit tests and they are fine. Since it is already good tested, I thought that changes in the API wouldn't require new tests, as long as the current tests are working fine.
Closes#24071 from DylanGuedes/spark-27138.
Authored-by: DylanGuedes <djmgguedes@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
It's a little awkward to have 2 different classes(`CaseInsensitiveStringMap` and `DataSourceOptions`) to present the options in data source and catalog API.
This PR merges these 2 classes, while keeping the name `CaseInsensitiveStringMap`, which is more precise.
## How was this patch tested?
existing tests
Closes#24025 from cloud-fan/option.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch.
This PR fixes it.
## How was this patch tested?
existing tests
Closes#23981 from cloud-fan/dsv2.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
It adds Kafka delegation token support for DStreams. Please be aware as Kafka native sink is not available for DStreams this PR contains delegation token usage only on consumer side.
What this PR contains:
* Usage of token through dynamic JAAS configuration
* `KafkaConfigUpdater` moved to `kafka-0-10-token-provider`
* `KafkaSecurityHelper` functionality moved into `KafkaTokenUtil`
* Documentation
## How was this patch tested?
Existing unit tests + on cluster.
Long running Kafka to file tests on 4 node cluster with randomly thrown artificial exceptions.
Test scenario:
* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512
Kafka broker settings:
* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)
After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
When token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token picked up correctly.
cd docs/
SKIP_API=1 jekyll build
Manual webpage check.
Closes#23929 from gaborgsomogyi/SPARK-27022.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic.
The changes are:
1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data.
2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic.
3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait.
The behavior changes:
1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.
## How was this patch tested?
existing tests
Closes#23859 from cloud-fan/update.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Redundant `get` when getting a value from `Map` given a key.
## How was this patch tested?
N/A
Closes#23901 from 10110346/removegetfrommap.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Spark not always picking up the latest Kafka delegation tokens even if a new one properly obtained.
In the PR I'm setting delegation tokens right before `KafkaConsumer` and `KafkaProducer` creation to be on the safe side.
## How was this patch tested?
Long running Kafka to Kafka tests on 4 node cluster with randomly thrown artificial exceptions.
Test scenario:
* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512
Kafka broker settings:
* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)
After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
But when token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token not always picked up.
Closes#23906 from gaborgsomogyi/SPARK-27002.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Comparing whether Boolean expression is equal to true is redundant
For example:
The datatype of `a` is boolean.
Before:
if (a == true)
After:
if (a)
## How was this patch tested?
N/A
Closes#23884 from 10110346/simplifyboolean.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing).
The major changes:
1. rename `StreamingWriteSupport` to `StreamingWrite`
2. add `WriteBuilder.buildForStreaming`
3. update existing sinks, to move the creation of `StreamingWrite` to `Table`
## How was this patch tested?
existing tests
Closes#23702 from cloud-fan/stream-write.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
`HadoopDelegationTokenProvider` has basically the same functionality just like `ServiceCredentialProvider` so the interfaces can be merged.
`YARNHadoopDelegationTokenManager` now loads `ServiceCredentialProvider`s in one step. The drawback of this if one provider fails all others are not loaded. `HadoopDelegationTokenManager` loads `HadoopDelegationTokenProvider`s independently so it provides more robust behaviour.
In this PR I've I've made the following changes:
* Deleted `YARNHadoopDelegationTokenManager` and `ServiceCredentialProvider`
* Made `HadoopDelegationTokenProvider` a `DeveloperApi`
## How was this patch tested?
Existing unit tests.
Closes#23686 from gaborgsomogyi/SPARK-26772.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Currently, looks, to use `from_avro` and `to_avro` in Java APIs side,
```java
import static org.apache.spark.sql.avro.package$.MODULE$;
MODULE$.to_avro
MODULE$.from_avro
```
This PR targets to deprecate and move both functions under `avro` package into `functions` object like the way of our `org.apache.spark.sql.functions`.
Therefore, Java side can import:
```java
import static org.apache.spark.sql.avro.functions.*;
```
and Scala side can import:
```scala
import org.apache.spark.sql.avro.functions._
```
## How was this patch tested?
Manually tested, and unit tests for Java APIs were added.
Closes#23784 from HyukjinKwon/SPARK-26870.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose to use `System.nanoTime()` instead of `System.currentTimeMillis()` in measurements of time intervals.
`System.currentTimeMillis()` returns current wallclock time and will follow changes to the system clock. Thus, negative wallclock adjustments can cause timeouts to "hang" for a long time (until wallclock time has caught up to its previous value again). This can happen when ntpd does a "step" after the network has been disconnected for some time. The most canonical example is during system bootup when DHCP takes longer than usual. This can lead to failures that are really hard to understand/reproduce. `System.nanoTime()` is guaranteed to be monotonically increasing irrespective of wallclock changes.
## How was this patch tested?
By existing test suites.
Closes#23727 from MaxGekk/system-nanotime.
Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Delegation token providers interface now has a parameter `fileSystems` but this is needed only for `HadoopFSDelegationTokenProvider`.
In this PR I've addressed this issue in the following way:
* Removed `fileSystems` parameter from `HadoopDelegationTokenProvider`
* Moved `YarnSparkHadoopUtil.hadoopFSsToAccess` into `HadoopFSDelegationTokenProvider`
* Moved `spark.yarn.stagingDir` into core
* Moved `spark.yarn.access.namenodes` into core and renamed to `spark.kerberos.access.namenodes`
* Moved `spark.yarn.access.hadoopFileSystems` into core and renamed to `spark.kerberos.access.hadoopFileSystems`
## How was this patch tested?
Existing unit tests.
Closes#23698 from gaborgsomogyi/SPARK-26766.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In #23639, the API `supportDataType` is refactored. We should also remove the method `verifyWriteSchema` and `verifyReadSchema` in `DataSourceUtils`.
Since the error message use `FileFormat.toString` to specify the data source naming, this PR also overriding the `toString` method in `AvroFileFormat`.
## How was this patch tested?
Unit test.
Closes#23699 from gengliangwang/SPARK-26716-followup.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Fix the integer overflow issue in rateLimit.
## How was this patch tested?
Pass the Jenkins with newly added UT for the possible case where integer could be overflowed.
Closes#23666 from linehrr/master.
Authored-by: ryne.yang <ryne.yang@acuityads.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
The API `supportDataType` in `FileFormat` helps to validate the output/input schema before exection starts. So that we can avoid some invalid data source IO, and users can see clean error messages.
This PR is to override the validation API in Avro data source.
Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), `NullType` is supported. This PR fixes the handling of `NullType`.
## How was this patch tested?
Unit test
Closes#23684 from gengliangwang/avroSupportDataType.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Following https://github.com/apache/spark/pull/23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
The major changes:
1. rename `XXXContinuousReadSupport` to `XXXContinuousStream`
2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
3. remove all the hacks as we have finished all the read side API refactor
## How was this patch tested?
existing tests
Closes#23619 from cloud-fan/continuous.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
There are ugly provided dependencies inside core for the following:
* Hive
* Kafka
In this PR I've extracted them out. This PR contains the following:
* Token providers are now loaded with service loader
* Hive token provider moved to hive project
* Kafka token provider extracted into a new project
## How was this patch tested?
Existing + newly added unit tests.
Additionally tested on cluster.
Closes#23499 from gaborgsomogyi/SPARK-26254.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Following https://github.com/apache/spark/pull/23086, this PR does the API refactor for micro-batch read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
The major changes:
1. rename `XXXMicroBatchReadSupport` to `XXXMicroBatchReadStream`
2. implement `TableProvider`, `Table`, `ScanBuilder` and `Scan` for streaming sources
3. at the beginning of micro-batch streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
followup:
support operator pushdown for stream sources
## How was this patch tested?
existing tests
Closes#23430 from cloud-fan/micro-batch.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter.
## How was this patch tested?
Modified UT.
Closes#23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
This PR allows the user to override `kafka.group.id` for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id.
It also fixes a bug that the `groupIdPrefix` option cannot be retrieved.
## How was this patch tested?
The new added unit tests.
Closes#23301 from zsxwing/SPARK-26350.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
When determining CatalystType for postgres columns with type `numeric[]` set the type of array element to `DecimalType(38, 18)` instead of `DecimalType(0,0)`.
## How was this patch tested?
Tested with modified `org.apache.spark.sql.jdbc.JDBCSuite`.
Ran the `PostgresIntegrationSuite` manually.
Closes#23456 from a-shkarupin/postgres_numeric_array.
Lead-authored-by: Oleksii Shkarupin <a.shkarupin@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
This PR makes `scalastyle` to check `docker-integration-tests` module additionally and fixes one error.
## How was this patch tested?
Pass the Jenkins with the updated Scalastyle.
```
========================================================================
Running Scala style checks
========================================================================
Scalastyle checks passed.
```
Closes#23459 from dongjoon-hyun/SPARK-26541.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
This PR upgrades Mockito from 1.10.19 to 2.23.4. The following changes are required.
- Replace `org.mockito.Matchers` with `org.mockito.ArgumentMatchers`
- Replace `anyObject` with `any`
- Replace `getArgumentAt` with `getArgument` and add type annotation.
- Use `isNull` matcher in case of `null` is invoked.
```scala
saslHandler.channelInactive(null);
- verify(handler).channelInactive(any(TransportClient.class));
+ verify(handler).channelInactive(isNull());
```
- Make and use `doReturn` wrapper to avoid [SI-4775](https://issues.scala-lang.org/browse/SI-4775)
```scala
private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
```
## How was this patch tested?
Pass the Jenkins with the existing tests.
Closes#23452 from dongjoon-hyun/SPARK-26536.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. And, [SPARK-21464](https://issues.apache.org/jira/browse/SPARK-21464) minimized it at 2.2.1. Recently, it grows again in test suites. This PR aims to clean up newly introduced deprecation warnings for Spark 3.0.
## How was this patch tested?
Pass the Jenkins with existing tests and manually check the warnings.
Closes#23367 from dongjoon-hyun/SPARK-26428.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data.
As per suggestion in KAFKA-7703, we put a position call between poll and seekToEnd to block the fetch request triggered by `poll` before calling `seekToEnd`.
In addition, to avoid other unknown issues, we also use the previous known offsets to audit the latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times.
## How was this patch tested?
Jenkins
Closes#23324 from zsxwing/SPARK-26267.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
SinkProgress should report similar properties like SourceProgress as long as they are available for given Sink. Count of written rows is metric availble for all Sinks. Since relevant progress information is with respect to commited rows, ideal object to carry this info is WriterCommitMessage. For brevity the implementation will focus only on Sinks with API V2 and on Micro Batch mode. Implemention for Continuous mode will be provided at later date.
### Before
```
{"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317"}
```
### After
```
{"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317","numOutputRows":5000}
```
### This PR is related to:
- https://issues.apache.org/jira/browse/SPARK-24647
- https://issues.apache.org/jira/browse/SPARK-21313
## How was this patch tested?
Existing and new unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#21919 from vackosar/feature/SPARK-24933-numOutputRows.
Lead-authored-by: Vaclav Kosar <admin@vaclavkosar.com>
Co-authored-by: Kosar, Vaclav: Functions Transformation <Vaclav.Kosar@barclayscapital.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
As Kafka delegation token added logic into ConfigUpdater it would be good to test it.
This PR contains the following changes:
* ConfigUpdater extracted to a separate file and renamed to KafkaConfigUpdater
* mockito-core dependency added to kafka-0-10-sql
* Unit tests added
## How was this patch tested?
Existing + new unit tests + on cluster.
Closes#23321 from gaborgsomogyi/SPARK-26371.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
remove a redundant `KafkaWriter.validateQuery` call in `KafkaSourceProvider `
## How was this patch tested?
Just removing duplicate codes, so I just build and run unit tests.
Closes#23309 from JasonWayne/SPARK-26360.
Authored-by: jasonwayne <wuwenjie0102@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter.
In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`).
## How was this patch tested?
Existing unit tests + on cluster.
Closes#23274 from gaborgsomogyi/SPARK-26322.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
spark.kafka.sasl.kerberos.service.name is an optional parameter but most of the time value `kafka` has to be set. As I've written in the jira the following reasoning is behind:
* Kafka's configuration guide suggest the same value: https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig
* It would be easier for spark users by providing less configuration
* Other streaming engines are doing the same
In this PR I've changed the parameter from optional to `WithDefault` and set `kafka` as default value.
## How was this patch tested?
Available unit tests + on cluster.
Closes#23254 from gaborgsomogyi/SPARK-26304.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources.
More concretely, it adds
1. `TableProvider`, works like an anonymous catalog
2. `Table`, represents a structured data set.
3. `ScanBuilder` and `Scan`, a logical represents of data source scan
4. `Batch`, a physical representation of data source batch scan.
## How was this patch tested?
existing tests
Closes#23086 from cloud-fan/refactor-batch.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)
What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code
What this PR doesn't contain:
* Documentation changes because design can change
## How was this patch tested?
Existing tests + added small amount of additional unit tests.
Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256
An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```
An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```
Closes#22598 from gaborgsomogyi/SPARK-25501.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.
This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway.
updated functions: `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`.
For other places:
1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys.
2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2.
3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1.
4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup.
## How was this patch tested?
updated tests and new tests
Closes#23124 from cloud-fan/map.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Allow the Spark Structured Streaming user to specify the prefix of the consumer group (group.id), compared to force consumer group ids of the form `spark-kafka-source-*`
## How was this patch tested?
Unit tests provided by Spark (backwards compatible change, i.e., user can optionally use the functionality)
`mvn test -pl external/kafka-0-10`
Closes#23103 from zouzias/SPARK-26121.
Authored-by: Anastasios Zouzias <anastasios@sqooba.io>
Signed-off-by: cody koeninger <cody@koeninger.org>
## What changes were proposed in this pull request?
This is a followup of #23099 . After upgrading to Kafka 2.1.0, maven test fails due to Zookeeper test dependency while sbt test succeeds.
- [sbt test on master branch](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/5203/)
- [maven test on master branch](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5653/)
The root cause is that the embedded Kafka server is using [Zookeepr 3.4.7 API](https://zookeeper.apache.org/doc/r3.4.7/api/org/apache/zookeeper/AsyncCallback.MultiCallback.html
) while Apache Spark provides Zookeeper 3.4.6. This PR adds a test dependency.
```
KafkaMicroBatchV2SourceSuite:
*** RUN ABORTED ***
...
org.apache.spark.sql.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:123)
...
Cause: java.lang.ClassNotFoundException: org.apache.zookeeper.AsyncCallback$MultiCallback
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1693)
at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:348)
at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:372)
at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2(KafkaTestUtils.scala:120)
at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2$adapted(KafkaTestUtils.scala:116)
...
```
## How was this patch tested?
Pass the maven Jenkins test.
Closes#23119 from dongjoon-hyun/SPARK-25954-2.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
The build has a lot of deprecation warnings. Some are new in Scala 2.12 and Java 11. We've fixed some, but I wanted to take a pass at fixing lots of easy miscellaneous ones here.
They're too numerous and small to list here; see the pull request. Some highlights:
- `BeanInfo` is deprecated in 2.12, and BeanInfo classes are pretty ancient in Java. Instead, case classes can explicitly declare getters
- Eta expansion of zero-arg methods; foo() becomes () => foo() in many cases
- Floating-point Range is inexact and deprecated, like 0.0 to 100.0 by 1.0
- finalize() is finally deprecated (just needs to be suppressed)
- StageInfo.attempId was deprecated and easiest to remove here
I'm not now going to touch some chunks of deprecation warnings:
- Parquet deprecations
- Hive deprecations (particularly serde2 classes)
- Deprecations in generated code (mostly Thriftserver CLI)
- ProcessingTime deprecations (we may need to revive this class as internal)
- many MLlib deprecations because they concern methods that may be removed anyway
- a few Kinesis deprecations I couldn't figure out
- Mesos get/setRole, which I don't know well
- Kafka/ZK deprecations (e.g. poll())
- Kinesis
- a few other ones that will probably resolve by deleting a deprecated method
## How was this patch tested?
Existing tests, including manual testing with the 2.11 build and Java 11.
Closes#23065 from srowen/SPARK-26090.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This restores scaladoc artifact generation, which got dropped with the Scala 2.12 update. The change looks large, but is almost all due to needing to make the InterfaceStability annotations top-level classes (i.e. `InterfaceStability.Stable` -> `Stable`), unfortunately. A few inner class references had to be qualified too.
Lots of scaladoc warnings now reappear. We can choose to disable generation by default and enable for releases, later.
## How was this patch tested?
N/A; build runs scaladoc now.
Closes#23069 from srowen/SPARK-26026.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR makes Spark's default Scala version as 2.12, and Scala 2.11 will be the alternative version. This implies that Scala 2.12 will be used by our CI builds including pull request builds.
We'll update the Jenkins to include a new compile-only jobs for Scala 2.11 to ensure the code can be still compiled with Scala 2.11.
## How was this patch tested?
existing tests
Closes#22967 from dbtsai/scala2.12.
Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Add scala and java lint check rules to ban the usage of `throw new xxxErrors` and fix up all exists instance followed by https://github.com/apache/spark/pull/22989#issuecomment-437939830. See more details in https://github.com/apache/spark/pull/22969.
## How was this patch tested?
Local test with lint-scala and lint-java.
Closes#22989 from xuanyuanking/SPARK-25986.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Add read benchmark for Avro, which is missing for a period.
The benchmark is similar to `DataSourceReadBenchmark` and `OrcReadBenchmark`
Manually run benchmark
Closes#22966 from gengliangwang/avroReadBenchmark.
Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Deprecated in Java 11, replace Class.newInstance with Class.getConstructor.getInstance, and primtive wrapper class constructors with valueOf or equivalent
## How was this patch tested?
Existing tests.
Closes#22988 from srowen/SPARK-25984.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently in `FailureSafeParser` and `from_avro`, the exception is created with such code
```
throw new SparkException("Malformed records are detected in record parsing. " +
s"Parse Mode: ${FailFastMode.name}.", e.cause)
```
1. The cause part should be `e` instead of `e.cause`
2. If `e` contains non-null message, it should be shown in `from_json`/`from_csv`/`from_avro`, e.g.
```
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('1' (code 49)): was expecting a colon to separate field name and value
at [Source: (InputStreamReader); line: 1, column: 7]
```
3.Kindly show hint for trying PERMISSIVE in error message.
## How was this patch tested?
Unit test.
Closes#22895 from gengliangwang/improve_error_msg.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Refactor BuiltInDataSourceWriteBenchmark, DataSourceWriteBenchmark and AvroWriteBenchmark to use main method.
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BuiltInDataSourceWriteBenchmark"
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "avro/test:runMain org.apache.spark.sql.execution.benchmark.AvroWriteBenchmark"
```
## How was this patch tested?
manual tests
Closes#22861 from yucai/BuiltInDataSourceWriteBenchmark.
Lead-authored-by: yucai <yyu1@ebay.com>
Co-authored-by: Yucai Yu <yucai.yu@foxmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
In this test, i have reduced the test time to 20 secs from 1 minute while reducing the sleep time from 1 sec to 100 milliseconds.
With this change, i was able to run the test in 20+ seconds consistently on my laptop. I would like see if it passes in jenkins consistently.
## How was this patch tested?
Its a test fix.
Closes#22900 from dilipbiswal/SPARK-25618.
Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Previously in from_avro/to_avro, we override the method `simpleString` and `sql` for the string output. However, the override only affects the alias naming:
```
Project [from_avro('col,
...
, (mode,PERMISSIVE)) AS from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))#11]
```
It only makes the alias name quite long: `from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))`).
We should follow `from_csv`/`from_json` here, to override the method prettyName only, and we will get a clean alias name
```
... AS from_avro(col)#11
```
## How was this patch tested?
Manual check
Closes#22890 from gengliangwang/revise_from_to_avro.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Current the function `from_avro` throws exception on reading corrupt records.
In practice, there could be various reasons of data corruption. It would be good to support `PERMISSIVE` mode and allow the function from_avro to process all the input file/streaming, which is consistent with from_json and from_csv. There is no obvious down side for supporting `PERMISSIVE` mode.
Different from `from_csv` and `from_json`, the default parse mode is `FAILFAST` for the following reasons:
1. Since Avro is structured data format, input data is usually able to be parsed by certain schema. In such case, exposing the problems of input data to users is better than hiding it.
2. For `PERMISSIVE` mode, we have to force the data schema as fully nullable. This seems quite unnecessary for Avro. Reversing non-null schema might archive more perf optimizations in Spark.
3. To be consistent with the behavior in Spark 2.4 .
## How was this patch tested?
Unit test
Manual previewing generated html for the Avro data source doc:
![image](https://user-images.githubusercontent.com/1097932/47510100-02558880-d8aa-11e8-9d57-a43daee4c6b9.png)
Closes#22814 from gengliangwang/improve_from_avro.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Remove JavaSparkContextVarargsWorkaround
## How was this patch tested?
Existing tests.
Closes#22729 from srowen/SPARK-25737.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Also update Kinesis SDK's Jackson to match Spark's
## How was this patch tested?
Existing tests, including Kinesis ones, which ought to be hereby triggered.
This was uncovered, I believe, in https://github.com/apache/spark/pull/22729#issuecomment-430666080Closes#22757 from srowen/SPARK-24601.2.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>