b0a5cd8909
## What changes were proposed in this pull request? Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka. ### Streaming Kafka Sink - When addBatch is called -- If batchId is great than the last written batch --- Write batch to Kafka ---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record. -- Else ignore ### Batch Kafka Sink - KafkaSourceProvider will implement CreatableRelationProvider - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka - Topic will be taken from the record, if present, or from topic option, which overrides topic in record. - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException tdas zsxwing ## How was this patch tested? ### The following unit tests will be included - write to stream with topic field: valid stream write with data that includes an existing topic in the schema - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers. - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics. - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here. ### Examples ```scala // Structured Streaming val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value") .selectExpr("value as key", "value as value") .writeStream .format("kafka") .option("checkpointLocation", checkpointDir) .outputMode(OutputMode.Append) .option("kafka.bootstrap.servers", brokerAddress) .option("topic", topic) .queryName("kafkaStream") .start() // Batch val df = spark .sparkContext .parallelize(Seq("1", "2", "3", "4", "5")) .map(v => (topic, v)) .toDF("topic", "value") df.write .format("kafka") .option("kafka.bootstrap.servers",brokerAddress) .option("topic", topic) .save() ``` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #17043 from tcondie/kafka-writer. |
||
---|---|---|
.. | ||
docker | ||
docker-integration-tests | ||
flume | ||
flume-assembly | ||
flume-sink | ||
kafka-0-8 | ||
kafka-0-8-assembly | ||
kafka-0-10 | ||
kafka-0-10-assembly | ||
kafka-0-10-sql | ||
kinesis-asl | ||
kinesis-asl-assembly | ||
spark-ganglia-lgpl |