## What changes were proposed in this pull request?
Looks Avro uses direct `getLogger` to create a SLF4J logger. Should better use `internal.Logging` instead.
## How was this patch tested?
Exiting tests.
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21914 from HyukjinKwon/avro-log.
## What changes were proposed in this pull request?
Add one more test case for `com.databricks.spark.avro`.
## How was this patch tested?
N/A
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21906 from gatorsmile/avro.
## What changes were proposed in this pull request?
In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.
Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.
## How was this patch tested?
I added new test which read meta info from written avro files and checks `avro.codec` property.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21837 from MaxGekk/avro-compression.
## What changes were proposed in this pull request?
In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.
Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21873 from gengliangwang/linterRule.
## What changes were proposed in this pull request?
This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.
## How was this patch tested?
Pass the newly added tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#21878 from dongjoon-hyun/SPARK-24924.
## What changes were proposed in this pull request?
After rethinking on the artifactId, I think it should be `spark-avro` instead of `spark-sql-avro`, which is simpler, and consistent with the previous artifactId. I think we need to change it before Spark 2.4 release.
Also a tiny change: use `spark.sessionState.newHadoopConf()` to get the hadoop configuration, thus the related hadoop configurations in SQLConf will come into effect.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21866 from gengliangwang/avro_followup.
## What changes were proposed in this pull request?
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.
Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.
Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.
## How was this patch tested?
This uses existing tests.
Author: Ryan Blue <blue@apache.org>
Closes#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
## What changes were proposed in this pull request?
To implement the method `buildReader` in `FileFormat`, it is required to serialize the hadoop configuration for executors.
Previous spark-avro uses its own class `SerializableConfiguration` for the serialization. As now it is part of Spark, we can use SerializableConfiguration in Spark util to deduplicate the code.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21846 from gengliangwang/removeSerializableConfiguration.
## What changes were proposed in this pull request?
As per Reynold's comment: https://github.com/apache/spark/pull/21742#discussion_r203496489
It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21841 from gengliangwang/removeImplicit.
## What changes were proposed in this pull request?
1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.
2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.
I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/
Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.
## How was this patch tested?
Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile
```
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21838 from gengliangwang/from_and_to_avro.
## What changes were proposed in this pull request?
I propose to add new option for AVRO datasource which should control ignoring of files without `.avro` extension in read. The option has name `ignoreExtension` with default value `true`. If both options `ignoreExtension` and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` overrides the former one. Here is an example of usage:
```
spark
.read
.option("ignoreExtension", false)
.avro("path to avro files")
```
## How was this patch tested?
I added a test which checks the option directly and a test for checking that new option overrides hadoop's config.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21798 from MaxGekk/avro-ignore-extension.
## What changes were proposed in this pull request?
Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37
On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21829 from gengliangwang/removeSerializableSchema.
## What changes were proposed in this pull request?
Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.
Add a new function to_avro for converting a column into binary of avro format with the specified schema.
This PR is in progress. Will add test cases.
## How was this patch tested?
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21774 from gengliangwang/from_and_to_avro.
## What changes were proposed in this pull request?
As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.
The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21804 from mgaido91/SPARK-24268_catalog.
## What changes were proposed in this pull request?
In the PR, I propose to put all `Avro` options in new class `AvroOptions` in the same way as for other datasources `JSON` and `CSV`.
## How was this patch tested?
It was tested by `AvroSuite`
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21810 from MaxGekk/avro-options.
## What changes were proposed in this pull request?
This pr fixes lint-java and Scala 2.12 build.
lint-java:
```
[ERROR] src/test/resources/log4j.properties:[0] (misc) NewlineAtEndOfFile: File does not end with a newline.
```
Scala 2.12 build:
```
[error] /.../sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala:121: overloaded method value addTaskCompletionListener with alternatives:
[error] (f: org.apache.spark.TaskContext => Unit)org.apache.spark.TaskContext <and>
[error] (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] cannot be applied to (org.apache.spark.TaskContext => java.util.List[Runnable])
[error] context.addTaskCompletionListener { ctx =>
[error] ^
```
## How was this patch tested?
Manually executed lint-java and Scala 2.12 build in my local environment.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21801 from ueshin/issues/SPARK-24386_24768/fix_build.
## What changes were proposed in this pull request?
In the PR, I propose to change default behaviour of AVRO datasource which currently ignores files without `.avro` extension in read by default. This PR sets the default value for `avro.mapred.ignore.inputs.without.extension` to `false` in the case if the parameter is not set by an user.
## How was this patch tested?
Added a test file without extension in AVRO format, and new test for reading the file with and wihout specified schema.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21769 from MaxGekk/avro-without-extension.
## What changes were proposed in this pull request?
In the PR, I propose to move `testFile()` to the common trait `SQLTestUtilsBase` and wrap test files in `AvroSuite` by the method `testFile()` which returns full paths to test files in the resource folder.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21773 from MaxGekk/test-file.
## What changes were proposed in this pull request?
Currently the Avro Deserializer converts input Avro format data to `Row`, and then convert the `Row` to `InternalRow`.
While the Avro Serializer converts `InternalRow` to `Row`, and then output Avro format data.
This PR allows direct conversion between `InternalRow` and Avro format data.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21762 from gengliangwang/avro_io.
## What changes were proposed in this pull request?
Improve Avro unit test:
1. use QueryTest/SharedSQLContext/SQLTestUtils, instead of the duplicated test utils.
2. replace deprecated methods
This is a follow up PR for #21760, the PR passes pull request tests but failed in: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7842/
This PR is to fix it.
## How was this patch tested?
Unit test.
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile
```
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21768 from gengliangwang/improve_avro_test.
## What changes were proposed in this pull request?
Improve Avro unit test:
1. use QueryTest/SharedSQLContext/SQLTestUtils, instead of the duplicated test utils.
2. replace deprecated methods
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21760 from gengliangwang/improve_avro_test.
We have hundreds of kafka topics need to be consumed in one application. The application master will throw OOM exception after hanging for nearly half of an hour.
OOM happens in the env with a lot of topics, and it's not convenient to set up such kind of env in the unit test. So I didn't change/add test case.
Author: Yuanbo Liu <yuanbo@Yuanbos-MacBook-Air.local>
Author: yuanbo <yuanbo@apache.org>
Closes#21690 from yuanboliu/master.
## What changes were proposed in this pull request?
Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines. Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.
[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21742 from gengliangwang/export_avro.
Setting the timestamp directly would cause exception on reading stream, it can be set directly only if the mode is not AT_TIMESTAMP
## What changes were proposed in this pull request?
The last patch in the kinesis streaming receiver sets the timestamp for the mode AT_TIMESTAMP, but this mode can only be set via the
`baseClientLibConfiguration.withTimestampAtInitialPositionInStream()
`
and can't be set directly using
`.withInitialPositionInStream()`
This patch fixes the issue.
## How was this patch tested?
Kinesis Receiver doesn't expose the internal state outside, so couldn't find the right way to test this change. Seeking for tips from other contributors here.
Author: Yash Sharma <ysharma@atlassian.com>
Closes#21541 from yashs360/ysharma/fix_kinesis_bug.
## What changes were proposed in this pull request?
SPARK-22893 tried to unify error messages about dataTypes. Unfortunately, still many places were missing the `simpleString` method in other to have the same representation everywhere.
The PR unified the messages using alway the simpleString representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21321 from mgaido91/SPARK-24268.
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.
For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.
Closes#21558
Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>
Closes#21606 from vanzin/SPARK-24552.2.
## What changes were proposed in this pull request?
This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#21382 from zsxwing/fix-network-timeout-conf.
## What changes were proposed in this pull request?
`CachedKafkaConsumer` in the project streaming-kafka-0-10 is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one thread trying to read the same Kafka TopicPartition at the same time. This assumption is not true all the time and this can inadvertently lead to ConcurrentModificationException.
Here is a better way to design this. The consumer pool should be smart enough to avoid concurrent use of a cached consumer. If there is another request for the same TopicPartition as a currently in-use consumer, the pool should automatically return a fresh consumer.
- There are effectively two kinds of consumer that may be generated
- Cached consumer - this should be returned to the pool at task end
- Non-cached consumer - this should be closed at task end
- A trait called `KafkaDataConsumer` is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply call `val consumer = KafkaDataConsumer.acquire` and then `consumer.release`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is request for a consumer which is a task reattempt, then already existing cached consumer will be invalidated and a new consumer is generated. This could fix potential issues if the source of the reattempt is a malfunctioning consumer.
- In addition, I renamed the `CachedKafkaConsumer` class to `KafkaDataConsumer` because is a misnomer given that what it returns may or may not be cached.
## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same TopicPartition from the consumer pool.
Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Closes#20997 from gaborgsomogyi/SPARK-19185.
## What changes were proposed in this pull request?
SPARK-24073 renames DataReaderFactory -> InputPartition and DataReader -> InputPartitionReader. Some classes still reflects the old name and causes confusion. This patch renames the left over classes to reflect the new interface and fixes a few comments.
## How was this patch tested?
Existing unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Arun Mahadevan <arunm@apache.org>
Closes#21355 from arunmahadevan/SPARK-24308.
## What changes were proposed in this pull request?
Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`
This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.
ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.
## How was this patch tested?
Existing tests, which have been updated to use the new name.
Author: Ryan Blue <blue@apache.org>
Closes#21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.
## What changes were proposed in this pull request?
This makes it easy to understand at runtime which version is running. Great for debugging production issues.
## How was this patch tested?
Not necessary.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21160 from tdas/SPARK-24094.
## What changes were proposed in this pull request?
In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.
```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...
val query = streamingInputDF.join(cachedStaticDF).writeStream....
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.
With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.
In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.
## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21126 from tdas/SPARK-24050.
## What changes were proposed in this pull request?
Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.
## How was this patch tested?
Existing unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21134 from tdas/SPARK-24056.
## What changes were proposed in this pull request?
Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code makes sure there's no partition revoking scenarios, so trying to get latest offset from revoked partitions will throw exceptions as JIRA mentioned.
Partition revoking happens when new consumer joined the consumer group, which means different streaming apps are trying to use same group id. This is fundamentally not correct, different apps should use different consumer group. So instead of throwing an confused exception from Kafka, improve the exception message by identifying revoked partition and directly throw an meaningful exception when partition is revoked.
Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply cannot be worked without the fix.
```
8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, kssh-1] for group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group use_a_separate_group_id_for_each_stream with generation 4
18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group use_a_separate_group_id_for_each_stream
```
## How was this patch tested?
This is manually verified in local cluster, unfortunately I'm not sure how to simulate it in UT, so propose the PR without UT added.
Author: jerryshao <sshao@hortonworks.com>
Closes#21038 from jerryshao/SPARK-22968.
## What changes were proposed in this pull request?
There was no check on nullability for arguments of `Tuple`s. This could lead to have weird behavior when a null value had to be deserialized into a non-nullable Scala object: in those cases, the `null` got silently transformed in a valid value (like `-1` for `Int`), corresponding to the default value we are using in the SQL codebase. This situation was very likely to happen when deserializing to a Tuple of primitive Scala types (like Double, Int, ...).
The PR adds the `AssertNotNull` to arguments of tuples which have been asked to be converted to non-nullable types.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#20976 from mgaido91/SPARK-23835.
## What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Nolan Emirot <nolan@turo.com>
Closes#20990 from emirot/kinesis_stream_example_typo.
## What changes were proposed in this pull request?
This PR implemented the following cleanups related to `UnsafeWriter` class:
- Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter`
- Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter`
- Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()`
## How was this patch tested?
Tested by existing UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#20850 from kiszk/SPARK-23713.
## What changes were proposed in this pull request?
Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for Kafka 0.8 and 0.10
This is required in order to be able to use backpressure with huge lags, which cannot be processed at once. Without this parameter `DirectKafkaInputDStream` with backpressure enabled would try to get all the possible data from Kafka before adjusting consumption rate
## How was this patch tested?
- Tests added to `org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and `org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala`
- Manual tests on YARN cluster
Author: akonopko <alex.konopko@teamaol.com>
Author: Alexander Konopko <alexander.konopko@gmail.com>
Closes#19431 from akonopko/SPARK-18580-initialrate.
## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.
Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.
Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.
This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
- Cached consumer - this should be returned to the pool at task end
- Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.
This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.
## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#20767 from tdas/SPARK-23623.
## What changes were proposed in this pull request?
Omit rounding of backpressure rate. Effects:
- no batch with large number of records is created when rate from PID estimator is one
- the number of records per batch and partition is more fine-grained improving backpressure accuracy
## How was this patch tested?
This was tested by running:
- `mvn test -pl external/kafka-0-8`
- `mvn test -pl external/kafka-0-10`
- a streaming application which was suffering from the issue
JasonMWhite
The contribution is my original work and I license the work to the project under the project’s open source license
Author: Sebastian Arzt <sebastian.arzt@plista.com>
Closes#17774 from arzt/kafka-back-pressure.
## What changes were proposed in this pull request?
As discussion in #20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing.
## How was this patch tested?
Existing UT.
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#20689 from xuanyuanking/SPARK-23533.
## What changes were proposed in this pull request?
Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode.
I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query.
## How was this patch tested?
existing unit tests
Author: Jose Torres <jose@databricks.com>
Closes#20710 from jose-torres/api2.
## What changes were proposed in this pull request?
Currently, when the Kafka source reads from Kafka, it generates as many tasks as the number of partitions in the topic(s) to be read. In some case, it may be beneficial to read the data with greater parallelism, that is, with more number partitions/tasks. That means, offset ranges must be divided up into smaller ranges such the number of records in partition ~= total records in batch / desired partitions. This would also balance out any data skews between topic-partitions.
In this patch, I have added a new option called `minPartitions`, which allows the user to specify the desired level of parallelism.
## How was this patch tested?
New tests in KafkaMicroBatchV2SourceSuite.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#20698 from tdas/SPARK-23541.
## What changes were proposed in this pull request?
Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log).
## How was this patch tested?
Added new unit test
justinrmiller has been testing this branch in production for a few weeks
Author: cody koeninger <cody@koeninger.org>
Closes#20572 from koeninger/SPARK-17147.
## What changes were proposed in this pull request?
var `KafkaContinuousReader.knownPartitions` should be threadsafe as it is accessed from multiple threads - the query thread at the time of reader factory creation, and the epoch tracking thread at the time of `needsReconfiguration`.
## How was this patch tested?
Existing tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#20655 from tdas/SPARK-23484.