## What changes were proposed in this pull request?
This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources.
More concretely, it adds
1. `TableProvider`, works like an anonymous catalog
2. `Table`, represents a structured data set.
3. `ScanBuilder` and `Scan`, a logical represents of data source scan
4. `Batch`, a physical representation of data source batch scan.
## How was this patch tested?
existing tests
Closes#23086 from cloud-fan/refactor-batch.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)
What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code
What this PR doesn't contain:
* Documentation changes because design can change
## How was this patch tested?
Existing tests + added small amount of additional unit tests.
Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256
An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```
An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```
Closes#22598 from gaborgsomogyi/SPARK-25501.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Allow the Spark Structured Streaming user to specify the prefix of the consumer group (group.id), compared to force consumer group ids of the form `spark-kafka-source-*`
## How was this patch tested?
Unit tests provided by Spark (backwards compatible change, i.e., user can optionally use the functionality)
`mvn test -pl external/kafka-0-10`
Closes#23103 from zouzias/SPARK-26121.
Authored-by: Anastasios Zouzias <anastasios@sqooba.io>
Signed-off-by: cody koeninger <cody@koeninger.org>
## What changes were proposed in this pull request?
This is a followup of #23099 . After upgrading to Kafka 2.1.0, maven test fails due to Zookeeper test dependency while sbt test succeeds.
- [sbt test on master branch](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/5203/)
- [maven test on master branch](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5653/)
The root cause is that the embedded Kafka server is using [Zookeepr 3.4.7 API](https://zookeeper.apache.org/doc/r3.4.7/api/org/apache/zookeeper/AsyncCallback.MultiCallback.html
) while Apache Spark provides Zookeeper 3.4.6. This PR adds a test dependency.
```
KafkaMicroBatchV2SourceSuite:
*** RUN ABORTED ***
...
org.apache.spark.sql.kafka010.KafkaTestUtils.setupEmbeddedKafkaServer(KafkaTestUtils.scala:123)
...
Cause: java.lang.ClassNotFoundException: org.apache.zookeeper.AsyncCallback$MultiCallback
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1693)
at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:348)
at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:372)
at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2(KafkaTestUtils.scala:120)
at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2$adapted(KafkaTestUtils.scala:116)
...
```
## How was this patch tested?
Pass the maven Jenkins test.
Closes#23119 from dongjoon-hyun/SPARK-25954-2.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
The build has a lot of deprecation warnings. Some are new in Scala 2.12 and Java 11. We've fixed some, but I wanted to take a pass at fixing lots of easy miscellaneous ones here.
They're too numerous and small to list here; see the pull request. Some highlights:
- `BeanInfo` is deprecated in 2.12, and BeanInfo classes are pretty ancient in Java. Instead, case classes can explicitly declare getters
- Eta expansion of zero-arg methods; foo() becomes () => foo() in many cases
- Floating-point Range is inexact and deprecated, like 0.0 to 100.0 by 1.0
- finalize() is finally deprecated (just needs to be suppressed)
- StageInfo.attempId was deprecated and easiest to remove here
I'm not now going to touch some chunks of deprecation warnings:
- Parquet deprecations
- Hive deprecations (particularly serde2 classes)
- Deprecations in generated code (mostly Thriftserver CLI)
- ProcessingTime deprecations (we may need to revive this class as internal)
- many MLlib deprecations because they concern methods that may be removed anyway
- a few Kinesis deprecations I couldn't figure out
- Mesos get/setRole, which I don't know well
- Kafka/ZK deprecations (e.g. poll())
- Kinesis
- a few other ones that will probably resolve by deleting a deprecated method
## How was this patch tested?
Existing tests, including manual testing with the 2.11 build and Java 11.
Closes#23065 from srowen/SPARK-26090.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR makes Spark's default Scala version as 2.12, and Scala 2.11 will be the alternative version. This implies that Scala 2.12 will be used by our CI builds including pull request builds.
We'll update the Jenkins to include a new compile-only jobs for Scala 2.11 to ensure the code can be still compiled with Scala 2.11.
## How was this patch tested?
existing tests
Closes#22967 from dbtsai/scala2.12.
Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Deprecated in Java 11, replace Class.newInstance with Class.getConstructor.getInstance, and primtive wrapper class constructors with valueOf or equivalent
## How was this patch tested?
Existing tests.
Closes#22988 from srowen/SPARK-25984.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In this test, i have reduced the test time to 20 secs from 1 minute while reducing the sleep time from 1 sec to 100 milliseconds.
With this change, i was able to run the test in 20+ seconds consistently on my laptop. I would like see if it passes in jenkins consistently.
## How was this patch tested?
Its a test fix.
Closes#22900 from dilipbiswal/SPARK-25618.
Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
`Literal.value` should have a value a value corresponding to `dataType`. This pr added code to verify it and fixed the existing tests to do so.
## How was this patch tested?
Modified the existing tests.
Closes#22724 from maropu/SPARK-25734.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Specify `kafka.max.block.ms` to 10 seconds while creating the kafka writer. In the absence of this overridden config, by default it uses a default time out of 60 seconds.
With this change the test completes in close to 10 seconds as opposed to 1 minute.
## How was this patch tested?
This is a test fix.
Closes#22671 from dilipbiswal/SPARK-25615.
Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR fixes the Scala-2.12 build error due to ambiguity in `foreachBatch` test cases.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/428/console
```scala
[error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:102: ambiguous reference to overloaded definition,
[error] both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] and method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[Int], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit)
[error] ds.writeStream.foreachBatch((_, _) => {}).trigger(Trigger.Continuous("1 second")).start()
[error] ^
[error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:106: ambiguous reference to overloaded definition,
[error] both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] and method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[Int], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit)
[error] ds.writeStream.foreachBatch((_, _) => {}).partitionBy("value").start()
[error] ^
```
## How was this patch tested?
Manual.
Since this failure occurs in Scala-2.12 profile and test cases, Jenkins will not test this. We need to build with Scala-2.12 and run the tests.
Closes#22649 from dongjoon-hyun/SPARK-SCALA212.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
This patch is to bump the master branch version to 3.0.0-SNAPSHOT.
## How was this patch tested?
N/A
Closes#22606 from gatorsmile/bump3.0.
Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
`FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results.
## How was this patch tested?
The new unit test.
Closes#22507 from zsxwing/fix-kafka-reset.
Lead-authored-by: Shixiong Zhu <zsxwing@gmail.com>
Co-authored-by: Shixiong Zhu <shixiong@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
For self-join/self-union, Spark will produce a physical plan which has multiple `DataSourceV2ScanExec` instances referring to the same `ReadSupport` instance. In this case, the streaming source is indeed scanned multiple times, and the `numInputRows` metrics should be counted for each scan.
Actually we already have 2 test cases to verify the behavior:
1. `StreamingQuerySuite.input row calculation with same V2 source used twice in self-join`
2. `KafkaMicroBatchSourceSuiteBase.ensure stream-stream self-join generates only one offset in log and correct metrics`.
However, in these 2 tests, the expected result is different, which is super confusing. It turns out that, the first test doesn't trigger exchange reuse, so the source is scanned twice. The second test triggers exchange reuse, and the source is scanned only once.
This PR proposes to improve these 2 tests, to test with/without exchange reuse.
## How was this patch tested?
test only change
Closes#22402 from cloud-fan/bug.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
In the dev list, we can still discuss whether the next version is 2.5.0 or 3.0.0. Let us first bump the master branch version to `2.5.0-SNAPSHOT`.
## How was this patch tested?
N/A
Closes#22426 from gatorsmile/bumpVersionMaster.
Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This PR ensures to call `super.afterAll()` in `override afterAll()` method for test suites.
* Some suites did not call `super.afterAll()`
* Some suites may call `super.afterAll()` only under certain condition
* Others never call `super.afterAll()`.
This PR also ensures to call `super.beforeAll()` in `override beforeAll()` for test suites.
## How was this patch tested?
Existing UTs
Closes#22337 from kiszk/SPARK-25338.
Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Fix unused imports & outdated comments on `kafka-0-10-sql` module. (Found while I was working on [SPARK-23539](https://github.com/apache/spark/pull/22282))
## How was this patch tested?
Existing unit tests.
Closes#22342 from dongjinleekr/feature/fix-kafka-sql-trivials.
Authored-by: Lee Dongjin <dongjin@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out.
## How was this patch tested?
Jenkins
Closes#22334 from zsxwing/revert-SPARK-24863-SPARK-24748.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:
- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch
They are all covered by the new unit tests.
## How was this patch tested?
The new unit tests.
Closes#22042 from zsxwing/kafka-transaction-read.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration.
We caught most instances of this in the original PR, but this one slipped through.
## How was this patch tested?
n/a
Closes#22245 from jose-torres/fixflake.
Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query.
## How was this patch tested?
Jenkins.
Closes#22230 from zsxwing/SPARK-25214-2.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets.
This PR fixes the issue and also adds regression tests for all Kafka readers.
## How was this patch tested?
New tests.
Closes#22207 from zsxwing/SPARK-25214.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
This pr proposed to show RDD/relation names in RDD/Hive table scan nodes.
This change made these names show up in the webUI and explain results.
For example;
```
scala> sql("CREATE TABLE t(c1 int) USING hive")
scala> sql("INSERT INTO t VALUES(1)")
scala> spark.table("t").explain()
== Physical Plan ==
Scan hive default.t [c1#8], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#8]
^^^^^^^^^^^
```
<img width="212" alt="spark-pr-hive" src="https://user-images.githubusercontent.com/692303/44501013-51264c80-a6c6-11e8-94f8-0704aee83bb6.png">
Closes#20226
## How was this patch tested?
Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite`
Closes#22153 from maropu/pr20226.
Lead-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Tejas Patil <tejasp@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers and then generates progress events.
- The test thread that generates some input data, checked the answer and then verified the query generated progress event.
The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
a. Processes batch input to generate answer.
b. Signals `awaitProgressLockCondition` to wake up threads waiting for progress using `awaitOffset`
c. Generates progress event
2. Test execution thread
a. Calls `awaitOffset` to wait for progress, which waits on `awaitProgressLockCondition`.
b. As soon as `awaitProgressLockCondition` is signaled, it would move on the in the test to check answer.
c. Finally, it would verify the last generated progress event.
What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c -> 1c.
In other words, the progress event may be generated after the test tries to verify it.
The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, that is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a large value.
This latter is to ensure that test thread for keeps waiting on `awaitProgressLockCondition`until the MicroBatchExecution thread explicitly signals it. With the existing small timeout of 100ms the following sequence can occur.
- MicroBatchExecution thread updates committed offsets
- Test thread waiting on `awaitProgressLockCondition` accidentally times out after 100 ms, finds that the committed offsets have been updated, therefore returns from `awaitOffset` and moves on to the progress event tests.
- MicroBatchExecution thread then generates progress event and signals. But the test thread has already attempted to verify the event and failed.
By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, similar to `awaitInitialization`), this above type of race condition is also avoided.
## How was this patch tested?
Ran locally many times.
Closes#22182 from tdas/SPARK-25184.
Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
## What changes were proposed in this pull request?
Improve the data source v2 API according to the [design doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing)
summary of the changes
1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> `InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> `InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar renaming also happens at streaming and write APIs.
2. create `ScanConfig` to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming `ReadSupport`(previouslly named `DataSourceReader`) immutable. All other methods take `ScanConfig` as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.).
3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`.
4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`)
Some follow-ups we should do after this PR (tracked by https://issues.apache.org/jira/browse/SPARK-25186 ):
1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same as the previous `DataSourceReader`, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` take `DataSourceOptions` as parameter, if we decide to change the life cycle.
2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API more flexible. But it's only needed when we add the `replaceWhere` support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR.
3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas.
4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the `ScanConfig`. Better to be done in another PR.
5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the `SupportsPushdownXYZ` traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc.
6. Improve the continuous streaming engine to only create a new `ScanConfig` when re-configuring.
7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have for file source, we can change the hive partition pruning to use the public `Filter`.
## How was this patch tested?
existing tests.
Closes#22009 from cloud-fan/redesign.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source.
This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application.
## How was this patch tested?
Existing and new unit tests
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#21819 from arunmahadevan/SPARK-24863.
Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests.
This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak.
In additions, it also fixes `AdminClient` leak and cleans up cached producers (When a record is writtn using a producer, the producer will keep refreshing the topic and I don't find an API to clear it except closing the producer) to eliminate the following annoying logs:
```
8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available.
18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available.
18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available.
```
I also reverted b5eb54244e introduced by #22097 since it doesn't help.
## How was this patch tested?
Jenkins
Closes#22106 from zsxwing/SPARK-25116.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
I'm still seeing the Kafka tests failed randomly due to `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING`. I checked the test output and saw zookeeper picked up an ipv6 address. Most details can be found in https://issues.apache.org/jira/browse/KAFKA-7193
This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper will never use an ipv6 address.
## How was this patch tested?
Jenkins
Closes#22097 from zsxwing/fix-zookeeper-connect.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
This PR fixes typo regarding `auxiliary verb + verb[s]`. This is a follow-on of #21956.
## How was this patch tested?
N/A
Closes#22040 from kiszk/spellcheck1.
Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
A follow up of #21118
Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21948 from cloud-fan/row-write.
## What changes were proposed in this pull request?
This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak.
If accepted, this pull request should be merged into master as well.
## How was this patch tested?
Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`.
Author: Yuval Itzchakov <yuval.itzchakov@clicktale.com>
Closes#21997 from YuvalItzchakov/master.
## What changes were proposed in this pull request?
Increase ZK timeout and harmonize configs across Kafka tests to resol…ve potentially flaky test failure
## How was this patch tested?
Existing tests
Author: Sean Owen <srowen@gmail.com>
Closes#21995 from srowen/SPARK-18057.3.
## What changes were proposed in this pull request?
Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise.
## How was this patch tested?
Existing tests.
Author: Sean Owen <srowen@gmail.com>
Closes#21955 from srowen/SPARK-18057.2.
## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).
* We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).
* We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).
Note: regarding issue 1 in the doc the plan is:
> Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.
## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```
Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Closes#21930 from skonto/scala2.12-sup.
## What changes were proposed in this pull request?
This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.
## How was this patch tested?
This PR uses existing Kafka related unit tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: tedyu <yuzhihong@gmail.com>
Closes#21488 from tedyu/master.
## What changes were proposed in this pull request?
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.
Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.
Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.
## How was this patch tested?
This uses existing tests.
Author: Ryan Blue <blue@apache.org>
Closes#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
## What changes were proposed in this pull request?
As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.
The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21804 from mgaido91/SPARK-24268_catalog.
## What changes were proposed in this pull request?
SPARK-22893 tried to unify error messages about dataTypes. Unfortunately, still many places were missing the `simpleString` method in other to have the same representation everywhere.
The PR unified the messages using alway the simpleString representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21321 from mgaido91/SPARK-24268.
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.
For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.
Closes#21558
Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>
Closes#21606 from vanzin/SPARK-24552.2.
## What changes were proposed in this pull request?
This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#21382 from zsxwing/fix-network-timeout-conf.
## What changes were proposed in this pull request?
`CachedKafkaConsumer` in the project streaming-kafka-0-10 is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one thread trying to read the same Kafka TopicPartition at the same time. This assumption is not true all the time and this can inadvertently lead to ConcurrentModificationException.
Here is a better way to design this. The consumer pool should be smart enough to avoid concurrent use of a cached consumer. If there is another request for the same TopicPartition as a currently in-use consumer, the pool should automatically return a fresh consumer.
- There are effectively two kinds of consumer that may be generated
- Cached consumer - this should be returned to the pool at task end
- Non-cached consumer - this should be closed at task end
- A trait called `KafkaDataConsumer` is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply call `val consumer = KafkaDataConsumer.acquire` and then `consumer.release`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is request for a consumer which is a task reattempt, then already existing cached consumer will be invalidated and a new consumer is generated. This could fix potential issues if the source of the reattempt is a malfunctioning consumer.
- In addition, I renamed the `CachedKafkaConsumer` class to `KafkaDataConsumer` because is a misnomer given that what it returns may or may not be cached.
## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same TopicPartition from the consumer pool.
Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Closes#20997 from gaborgsomogyi/SPARK-19185.
## What changes were proposed in this pull request?
SPARK-24073 renames DataReaderFactory -> InputPartition and DataReader -> InputPartitionReader. Some classes still reflects the old name and causes confusion. This patch renames the left over classes to reflect the new interface and fixes a few comments.
## How was this patch tested?
Existing unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Arun Mahadevan <arunm@apache.org>
Closes#21355 from arunmahadevan/SPARK-24308.
## What changes were proposed in this pull request?
Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`
This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.
ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.
## How was this patch tested?
Existing tests, which have been updated to use the new name.
Author: Ryan Blue <blue@apache.org>
Closes#21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.
## What changes were proposed in this pull request?
This makes it easy to understand at runtime which version is running. Great for debugging production issues.
## How was this patch tested?
Not necessary.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21160 from tdas/SPARK-24094.
## What changes were proposed in this pull request?
In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.
```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...
val query = streamingInputDF.join(cachedStaticDF).writeStream....
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.
With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.
In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.
## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21126 from tdas/SPARK-24050.
## What changes were proposed in this pull request?
Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.
## How was this patch tested?
Existing unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21134 from tdas/SPARK-24056.
## What changes were proposed in this pull request?
There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).
The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#20976 from mgaido91/SPARK-23835.