From 941b3f9aca59e62c078508a934f8c2221ced96ce Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Oct 2016 17:32:16 -0700 Subject: [PATCH] [SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs ## What changes were proposed in this pull request? As per rxin request, here are further API changes - Changed `Stream(Started/Progress/Terminated)` events to `Stream*Event` - Changed the fields in `StreamingQueryListener.on***` from `query*` to `event` ## How was this patch tested? Existing unit tests. Author: Tathagata Das Closes #15530 from tdas/SPARK-17731-1. --- project/MimaExcludes.scala | 9 +++++++++ .../sql/execution/streaming/StreamExecution.scala | 15 ++++++++------- .../streaming/StreamingQueryListenerBus.scala | 8 ++++---- .../sql/streaming/StreamingQueryListener.scala | 14 +++++++------- .../apache/spark/sql/streaming/StreamTest.scala | 6 +++--- .../streaming/StreamingQueryListenerSuite.scala | 13 +++++++------ .../spark/sql/streaming/StreamingQuerySuite.scala | 6 +++--- 7 files changed, 41 insertions(+), 30 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1349af4219..facf034ea7 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -68,6 +68,15 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), // [SPARK-17338][SQL] add global temp view ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9144736c94..ba8cf808e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -165,7 +165,7 @@ class StreamExecution( new Path(new Path(checkpointRoot), name).toUri.toString /** - * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event + * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] * has been posted to all the listeners. */ def start(): Unit = { @@ -177,9 +177,10 @@ class StreamExecution( /** * Repeatedly attempts to run batches as data arrives. * - * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted - * such that listeners are guaranteed to get a start event before a termination. Furthermore, this - * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns. + * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are + * posted such that listeners are guaranteed to get a start event before a termination. + * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the + * `start()` method returns. */ private def runBatches(): Unit = { try { @@ -190,7 +191,7 @@ class StreamExecution( sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } updateStatus() - postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception. + postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -232,7 +233,7 @@ class StreamExecution( // Update metrics and notify others streamMetrics.reportTriggerFinished() updateStatus() - postEvent(new QueryProgress(currentStatus)) + postEvent(new QueryProgressEvent(currentStatus)) isTerminated }) } catch { @@ -260,7 +261,7 @@ class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( - new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) terminationLatch.countDown() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 1e663956f9..fc2190d39d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) */ def post(event: StreamingQueryListener.Event) { event match { - case s: QueryStarted => + case s: QueryStartedEvent => postToAll(s) case _ => sparkListenerBus.post(event) @@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) listener: StreamingQueryListener, event: StreamingQueryListener.Event): Unit = { event match { - case queryStarted: QueryStarted => + case queryStarted: QueryStartedEvent => listener.onQueryStarted(queryStarted) - case queryProgress: QueryProgress => + case queryProgress: QueryProgressEvent => listener.onQueryProgress(queryProgress) - case queryTerminated: QueryTerminated => + case queryTerminated: QueryTerminatedEvent => listener.onQueryTerminated(queryTerminated) case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 69790e33b2..9e311fae84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -41,7 +41,7 @@ abstract class StreamingQueryListener { * don't block this method as it will block your query. * @since 2.0.0 */ - def onQueryStarted(queryStarted: QueryStarted): Unit + def onQueryStarted(event: QueryStartedEvent): Unit /** * Called when there is some status update (ingestion rate updated, etc.) @@ -49,16 +49,16 @@ abstract class StreamingQueryListener { * @note This method is asynchronous. The status in [[StreamingQuery]] will always be * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]] * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]] - * is terminated when you are processing [[QueryProgress]]. + * is terminated when you are processing [[QueryProgressEvent]]. * @since 2.0.0 */ - def onQueryProgress(queryProgress: QueryProgress): Unit + def onQueryProgress(event: QueryProgressEvent): Unit /** * Called when a query is stopped, with or without error. * @since 2.0.0 */ - def onQueryTerminated(queryTerminated: QueryTerminated): Unit + def onQueryTerminated(event: QueryTerminatedEvent): Unit } @@ -84,7 +84,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event /** * :: Experimental :: @@ -92,7 +92,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event /** * :: Experimental :: @@ -104,7 +104,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryTerminated private[sql]( + class QueryTerminatedEvent private[sql]( val queryStatus: StreamingQueryStatus, val exception: Option[String]) extends Event } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 8dfeb8da4b..7428330651 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -684,20 +684,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } - override def onQueryStarted(queryStarted: QueryStarted): Unit = { + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { asyncTestWaiter { startStatus = queryStarted.queryStatus } } - override def onQueryProgress(queryProgress: QueryProgress): Unit = { + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryProgress called before onQueryStarted") synchronized { progressStatuses += queryProgress.queryStatus } } } - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryTerminated called before onQueryStarted") terminationStatus = queryTerminated.queryStatus diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 623f66a778..ff843865a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStarted serialization") { - val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus) + val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryStarted] + .asInstanceOf[StreamingQueryListener.QueryStartedEvent] assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus) } test("QueryProgress serialization") { - val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus) + val queryProcess = new StreamingQueryListener.QueryProgressEvent( + StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryProcess) val newQueryProcess = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryProgress] + .asInstanceOf[StreamingQueryListener.QueryProgressEvent] assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus) } test("QueryTerminated serialization") { val exception = new RuntimeException("exception") - val queryQueryTerminated = new StreamingQueryListener.QueryTerminated( + val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( StreamingQueryStatus.testStatus, Some(exception.getMessage)) val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryTerminated] + .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus) assert(queryQueryTerminated.exception === newQueryTerminated.exception) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9f8e2db966..92020be978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStarted): Unit = { } - override def onQueryProgress(queryProgress: QueryProgress): Unit = { + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { } + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } } try {