594c9c5a3e
## What changes were proposed in this pull request? This patch does pooling for both kafka consumers as well as fetched data. The overall benefits of the patch are following: * Both pools support eviction on idle objects, which will help closing invalid idle objects which topic or partition are no longer be assigned to any tasks. * It also enables applying different policies on pool, which helps optimization of pooling for each pool. * We concerned about multiple tasks pointing same topic partition as well as same group id, and existing code can't handle this hence excess seek and fetch could happen. This patch properly handles the case. * It also makes the code always safe to leverage cache, hence no need to maintain reuseCache parameter. Moreover, pooling kafka consumers is implemented based on Apache Commons Pool, which also gives couple of benefits: * We can get rid of synchronization of KafkaDataConsumer object while acquiring and returning InternalKafkaConsumer. * We can extract the feature of object pool to outside of the class, so that the behaviors of the pool can be tested easily. * We can get various statistics for the object pool, and also be able to enable JMX for the pool. FetchedData instances are pooled by custom implementation of pool instead of leveraging Apache Commons Pool, because they have CacheKey as first key and "desired offset" as second key which "desired offset" is changing - I haven't found any general pool implementations supporting this. This patch brings additional dependency, Apache Commons Pool 2.6.0 into `spark-sql-kafka-0-10` module. ## How was this patch tested? Existing unit tests as well as new tests for object pool. Also did some experiment regarding proving concurrent access of consumers for same topic partition. * Made change on both sides (master and patch) to log when creating Kafka consumer or fetching records from Kafka is happening. * branches * master: https://github.com/HeartSaVioR/spark/tree/SPARK-25151-master-ref-debugging * patch: https://github.com/HeartSaVioR/spark/tree/SPARK-25151-debugging * Test query (doing self-join) * https://gist.github.com/HeartSaVioR/d831974c3f25c02846f4b15b8d232cc2 * Ran query from spark-shell, with using `local[*]` to maximize the chance to have concurrent access * Collected the count of fetch requests on Kafka via command: `grep "creating new Kafka consumer" logfile | wc -l` * Collected the count of creating Kafka consumers via command: `grep "fetching data from Kafka consumer" logfile | wc -l` Topic and data distribution is follow: ``` truck_speed_events_stream_spark_25151_v1:0:99440 truck_speed_events_stream_spark_25151_v1:1:99489 truck_speed_events_stream_spark_25151_v1:2:397759 truck_speed_events_stream_spark_25151_v1:3:198917 truck_speed_events_stream_spark_25151_v1:4:99484 truck_speed_events_stream_spark_25151_v1:5:497320 truck_speed_events_stream_spark_25151_v1:6:99430 truck_speed_events_stream_spark_25151_v1:7:397887 truck_speed_events_stream_spark_25151_v1:8:397813 truck_speed_events_stream_spark_25151_v1:9:0 ``` The experiment only used smallest 4 partitions (0, 1, 4, 6) from these partitions to finish the query earlier. The result of experiment is below: branch | create Kafka consumer | fetch request -- | -- | -- master | 1986 | 2837 patch | 8 | 1706 Closes #22138 from HeartSaVioR/SPARK-25151. Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Co-authored-by: Jungtaek Lim <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> |
||
---|---|---|
.. | ||
avro | ||
docker | ||
docker-integration-tests | ||
kafka-0-10 | ||
kafka-0-10-assembly | ||
kafka-0-10-sql | ||
kafka-0-10-token-provider | ||
kinesis-asl | ||
kinesis-asl-assembly | ||
spark-ganglia-lgpl |