d2ff10cbe1
## What changes were proposed in this pull request? This PR proposes to add ML events to Instrumentation, and use it in Pipeline so that other developers can track and add some actions for them. ## Introduction ML events (like SQL events) can be quite useful when people want to track and make some actions for corresponding ML operations. For instance, I have been working on integrating Apache Spark with [Apache Atlas](https://atlas.apache.org/QuickStart.html). With some custom changes with this PR, I can visualise ML pipeline as below: ![spark_ml_streaming_lineage](https://user-images.githubusercontent.com/6477701/49682779-394bca80-faf5-11e8-85b8-5fae28b784b3.png) Another good thing that might have to be considered is, that we can interact this with other SQL/Streaming events. For instance, where the input `Dataset` is originated. For instance, with current Apache Spark, I can visualise SQL operations as below: ![screen shot 2018-12-10 at 9 41 36 am](https://user-images.githubusercontent.com/6477701/49706269-d9bdfe00-fc5f-11e8-943a-3309d1856ba5.png) I think we can combine those existing lineages together to easily understand where the data comes and goes. Currently, ML side is a hole so the lineages can't be connected for the current Apache Spark .. To add up, I think it's not to mention how useful it is to track the SQL/Streaming operations. Likewise, I would like to propose ML events as well (as lowest stability `Unstable` APIs for now - no guarantee about stability). ## Implementation Details ### Sends event (but not expose ML specific listener) **`mllib/src/main/scala/org/apache/spark/ml/events.scala`** ```scala Unstable case class ...StartEvent(caller, input) Unstable case class ...EndEvent(caller, output) trait MLEvents { // Wrappers to send events: // def with...Event(body) = { // body() // SparkContext.getOrCreate().listenerBus.post(event) // } } ``` This trait is used by `Instrumentation`. ```scala class Instrumentation ... with MLEvents { ``` and used as below: ```scala instrumented { instr => instr.with...Event(...) { ... } } ``` This way mimics both: **1. Catalog events (see `org/apache/spark/sql/catalyst/catalog/events.scala`)** - This allows a Catalog specific listener to be added `ExternalCatalogEventListener` - It's implemented in a way of wrapping whole `ExternalCatalog` named `ExternalCatalogWithListener` which delegates the operations to `ExternalCatalog` This is not quite possible in this case because most of instances (like `Pipeline`) will be directly created in most of cases. We might be able to do that via extending `ListenerBus` for all possible instances but IMHO it's too invasive. Also, exposing another ML specific listener sounds a bit too much at this stage. Therefore, I simply borrowed file name and structures here **2. SQL execution events (see `org/apache/spark/sql/execution/SQLExecution.scala`)** - Add an object that wraps a body to send events Current apporach is rather close to this. It has a `with...` wrapper to send events. I borrowed this approach to be consistent. ## Usage It needs a custom implementation for a query listener. For instance, with the custom listener below: ```scala class CustomMLListener extends SparkListener def onOtherEvents(e) = e match { case e: MLEvent => // do something case _ => // pass } } ``` There are two (existing) ways to use this. ```scala spark.sparkContext.addSparkListener(new CustomMLListener) ``` ```bash spark-submit ...\ --conf spark.extraListeners=CustomMLListener\ ... ``` It's also similar with other existing implementation in SQL side. ## Target users 1. I think someone in general would likely utilise this feature like other event listeners. At least, I can see some interests going on outside. - SQL Listener - https://stackoverflow.com/questions/46409339/spark-listener-to-an-sql-query - http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-Custom-Query-Execution-listener-via-conf-properties-td30979.html - Streaming Query Listener - https://jhui.github.io/2017/01/15/Apache-Spark-Streaming/ - http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Watermark-td25413.html#a25416 2. Someone would likely run this via Atlas. The plugin mirror intentionally is exposed at [spark-atlas-connector](https://github.com/hortonworks-spark/spark-atlas-connector) so that anyone could do something about lineage and governance in Atlas. I'm trying to show integrated lineages in Apache Spark but this is a missing hole. ## How was this patch tested? Manually tested and unit tests were added. Closes #23263 from HyukjinKwon/SPARK-23674-1. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
benchmarks | ||
src | ||
pom.xml |