[SPARK-10963][STREAMING][KAFKA] make KafkaCluster public

Author: cody koeninger <cody@koeninger.org>

Closes #9007 from koeninger/SPARK-10963.
This commit is contained in:
cody koeninger 2016-02-07 12:52:00 +00:00 committed by Sean Owen
parent bc8890b357
commit 140ddef373

View file

@ -29,15 +29,19 @@ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, To
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Convenience methods for interacting with a Kafka cluster.
* See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
* A Guide To The Kafka Protocol</a> for more details on individual api calls.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
private[spark]
@DeveloperApi
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
@ -227,7 +231,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// this 0 here indicates api version, in this case the original ZK backed api.
private def defaultConsumerApiVersion: Short = 0
/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
@ -246,7 +250,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
@ -283,7 +287,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
Left(errs)
}
/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
@ -301,7 +305,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}
/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata]
@ -359,7 +363,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
private[spark]
@DeveloperApi
object KafkaCluster {
type Err = ArrayBuffer[Throwable]
@ -371,7 +375,6 @@ object KafkaCluster {
)
}
private[spark]
case class LeaderOffset(host: String, port: Int, offset: Long)
/**
@ -379,7 +382,6 @@ object KafkaCluster {
* Simple consumers connect directly to brokers, but need many of the same configs.
* This subclass won't warn about missing ZK params, or presence of broker params.
*/
private[spark]
class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
@ -391,7 +393,6 @@ object KafkaCluster {
}
}
private[spark]
object SimpleConsumerConfig {
/**
* Make a consumer config without requiring group.id or zookeeper.connect,