spark-instrumented-optimizer/external
Jungtaek Lim (HeartSaVioR) 4513f1c0dc [SPARK-26848][SQL][SS] Introduce new option to Kafka source: offset by timestamp (starting/ending)
## What changes were proposed in this pull request?

This patch introduces new options "startingOffsetsByTimestamp" and "endingOffsetsByTimestamp" to set specific timestamp per topic (since we're unlikely to set the different value per partition) to let source starts reading from offsets which have equal of greater timestamp, and ends reading until offsets which have equal of greater timestamp.

The new option would be optional of course, and take preference over existing offset options.

## How was this patch tested?

New unit tests added. Also manually tested basic functionality with Kafka 2.0.0 server.

Running query below

```
val df = spark.read.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "spark_26848_test_v1,spark_26848_test_2_v1")
  .option("startingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669142193, "spark_26848_test_2_v1": 1549669240965}""")
  .option("endingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669265676, "spark_26848_test_2_v1": 1549699265676}""")
  .load().selectExpr("CAST(value AS STRING)")

df.show()
```

with below records (one string which number part remarks when they're put after such timestamp) in

topic `spark_26848_test_v1`
```
hello1 1549669142193
world1 1549669142193
hellow1 1549669240965
world1 1549669240965
hello1 1549669265676
world1 1549669265676
```

topic `spark_26848_test_2_v1`

```
hello2 1549669142193
world2 1549669142193
hello2 1549669240965
world2 1549669240965
hello2 1549669265676
world2 1549669265676
```

the result of `df.show()` follows:
```
+--------------------+
|               value|
+--------------------+
|world1 1549669240965|
|world1 1549669142193|
|world2 1549669240965|
|hello2 1549669240965|
|hellow1 154966924...|
|hello2 1549669265676|
|hello1 1549669142193|
|world2 1549669265676|
+--------------------+
```

Note that endingOffsets (as well as endingOffsetsByTimestamp) are exclusive.

Closes #23747 from HeartSaVioR/SPARK-26848.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-23 19:25:36 -05:00
..
avro [SPARK-29141][SQL][TEST] Use SqlBasedBenchmark in SQL benchmarks 2019-09-18 17:52:23 -07:00
docker [SPARK-28683][BUILD] Upgrade Scala to 2.12.10 2019-09-18 13:30:36 -07:00
docker-integration-tests [SPARK-28744][SQL][TEST] rename SharedSQLContext to SharedSparkSession 2019-08-19 19:01:56 +08:00
kafka-0-10 [SPARK-29007][STREAMING][MLLIB][TESTS] Enforce not leaking SparkContext in tests which creates new StreamingContext with new SparkContext 2019-09-11 10:29:13 -07:00
kafka-0-10-assembly [SPARK-25956] Make Scala 2.12 as default Scala version in Spark 3.0 2018-11-14 16:22:23 -08:00
kafka-0-10-sql [SPARK-26848][SQL][SS] Introduce new option to Kafka source: offset by timestamp (starting/ending) 2019-09-23 19:25:36 -05:00
kafka-0-10-token-provider [SPARK-28928][SS] Use Kafka delegation token protocol on sources/sinks 2019-09-09 15:41:51 -07:00
kinesis-asl [MINOR][DOCS] Fix few typos in the java docs 2019-09-12 09:30:03 +09:00
kinesis-asl-assembly [SPARK-25956] Make Scala 2.12 as default Scala version in Spark 3.0 2018-11-14 16:22:23 -08:00
spark-ganglia-lgpl [SPARK-25956] Make Scala 2.12 as default Scala version in Spark 3.0 2018-11-14 16:22:23 -08:00