[SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
## What changes were proposed in this pull request? Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`. Since Kafka consumer caching is not documented I've added this also. ## How was this patch tested? Existing + added unit test. ``` cd docs SKIP_API=1 jekyll build ``` and manual webpage check. Closes #24590 from gaborgsomogyi/SPARK-27687. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
d14e2d7874
commit
efa303581a
|
@ -714,7 +714,9 @@ private[spark] object SparkConf extends Logging {
|
|||
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
|
||||
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
|
||||
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
|
||||
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
|
||||
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0")),
|
||||
"spark.kafka.consumer.cache.capacity" -> Seq(
|
||||
AlternateConfig("spark.sql.kafkaConsumerCache.capacity", "3.0"))
|
||||
)
|
||||
|
||||
/**
|
||||
|
|
|
@ -416,6 +416,24 @@ The following configurations are optional:
|
|||
</tr>
|
||||
</table>
|
||||
|
||||
### Consumer Caching
|
||||
|
||||
It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
|
||||
Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information:
|
||||
* Topic name
|
||||
* Topic partition
|
||||
* Group ID
|
||||
|
||||
The size of the cache is limited by <code>spark.kafka.consumer.cache.capacity</code> (default: 64).
|
||||
If this threshold is reached, it tries to remove the least-used entry that is currently not in use.
|
||||
If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
|
||||
the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
|
||||
after which it will never reduce.
|
||||
|
||||
If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons.
|
||||
At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to
|
||||
be emphasized it will not be closed if any other task is using it.
|
||||
|
||||
## Writing Data to Kafka
|
||||
|
||||
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
|
||||
|
|
|
@ -33,8 +33,9 @@ package object kafka010 { // scalastyle:ignore
|
|||
.createWithDefaultString("10m")
|
||||
|
||||
private[kafka010] val CONSUMER_CACHE_CAPACITY =
|
||||
ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
|
||||
.doc("The size of consumers cached.")
|
||||
ConfigBuilder("spark.kafka.consumer.cache.capacity")
|
||||
.doc("The maximum number of consumers cached. Please note it's a soft limit" +
|
||||
" (check Structured Streaming Kafka integration guide for further details).")
|
||||
.intConf
|
||||
.createWithDefault(64)
|
||||
}
|
||||
|
|
30
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
vendored
Normal file
30
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
vendored
Normal file
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.kafka010
|
||||
|
||||
import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.util.ResetSystemProperties
|
||||
|
||||
class KafkaSparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
|
||||
test("deprecated configs") {
|
||||
val conf = new SparkConf()
|
||||
|
||||
conf.set("spark.sql.kafkaConsumerCache.capacity", "32")
|
||||
assert(conf.get(CONSUMER_CACHE_CAPACITY) === 32)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue