[SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs

## What changes were proposed in this pull request?

As per rxin request, here are further API changes
- Changed `Stream(Started/Progress/Terminated)` events to `Stream*Event`
- Changed the fields in `StreamingQueryListener.on***` from `query*` to `event`

## How was this patch tested?
Existing unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15530 from tdas/SPARK-17731-1.
This commit is contained in:
Tathagata Das 2016-10-18 17:32:16 -07:00
parent 1e35e96930
commit 941b3f9aca
7 changed files with 41 additions and 30 deletions

View file

@ -68,6 +68,15 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),

View file

@ -165,7 +165,7 @@ class StreamExecution(
new Path(new Path(checkpointRoot), name).toUri.toString
/**
* Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
* has been posted to all the listeners.
*/
def start(): Unit = {
@ -177,9 +177,10 @@ class StreamExecution(
/**
* Repeatedly attempts to run batches as data arrives.
*
* Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted
* such that listeners are guaranteed to get a start event before a termination. Furthermore, this
* method also ensures that [[QueryStarted]] event is posted before the `start()` method returns.
* Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are
* posted such that listeners are guaranteed to get a start event before a termination.
* Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the
* `start()` method returns.
*/
private def runBatches(): Unit = {
try {
@ -190,7 +191,7 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
updateStatus()
postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception.
postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception.
// Unblock starting thread
startLatch.countDown()
@ -232,7 +233,7 @@ class StreamExecution(
// Update metrics and notify others
streamMetrics.reportTriggerFinished()
updateStatus()
postEvent(new QueryProgress(currentStatus))
postEvent(new QueryProgressEvent(currentStatus))
isTerminated
})
} catch {
@ -260,7 +261,7 @@ class StreamExecution(
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}

View file

@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
*/
def post(event: StreamingQueryListener.Event) {
event match {
case s: QueryStarted =>
case s: QueryStartedEvent =>
postToAll(s)
case _ =>
sparkListenerBus.post(event)
@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
listener: StreamingQueryListener,
event: StreamingQueryListener.Event): Unit = {
event match {
case queryStarted: QueryStarted =>
case queryStarted: QueryStartedEvent =>
listener.onQueryStarted(queryStarted)
case queryProgress: QueryProgress =>
case queryProgress: QueryProgressEvent =>
listener.onQueryProgress(queryProgress)
case queryTerminated: QueryTerminated =>
case queryTerminated: QueryTerminatedEvent =>
listener.onQueryTerminated(queryTerminated)
case _ =>
}

View file

@ -41,7 +41,7 @@ abstract class StreamingQueryListener {
* don't block this method as it will block your query.
* @since 2.0.0
*/
def onQueryStarted(queryStarted: QueryStarted): Unit
def onQueryStarted(event: QueryStartedEvent): Unit
/**
* Called when there is some status update (ingestion rate updated, etc.)
@ -49,16 +49,16 @@ abstract class StreamingQueryListener {
* @note This method is asynchronous. The status in [[StreamingQuery]] will always be
* latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
* may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
* is terminated when you are processing [[QueryProgress]].
* is terminated when you are processing [[QueryProgressEvent]].
* @since 2.0.0
*/
def onQueryProgress(queryProgress: QueryProgress): Unit
def onQueryProgress(event: QueryProgressEvent): Unit
/**
* Called when a query is stopped, with or without error.
* @since 2.0.0
*/
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
def onQueryTerminated(event: QueryTerminatedEvent): Unit
}
@ -84,7 +84,7 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event
class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
/**
* :: Experimental ::
@ -92,7 +92,7 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event
class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
/**
* :: Experimental ::
@ -104,7 +104,7 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
class QueryTerminated private[sql](
class QueryTerminatedEvent private[sql](
val queryStatus: StreamingQueryStatus,
val exception: Option[String]) extends Event
}

View file

@ -684,20 +684,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
override def onQueryStarted(queryStarted: QueryStarted): Unit = {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
asyncTestWaiter {
startStatus = queryStarted.queryStatus
}
}
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
synchronized { progressStatuses += queryProgress.queryStatus }
}
}
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = queryTerminated.queryStatus

View file

@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
test("QueryStarted serialization") {
val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus)
val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryStarted]
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
}
test("QueryProgress serialization") {
val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus)
val queryProcess = new StreamingQueryListener.QueryProgressEvent(
StreamingQueryStatus.testStatus)
val json = JsonProtocol.sparkEventToJson(queryProcess)
val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgress]
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
}
test("QueryTerminated serialization") {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
StreamingQueryStatus.testStatus,
Some(exception.getMessage))
val json =
JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryTerminated]
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}

View file

@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// A StreamingQueryListener that gets the query status after the first completed trigger
val listener = new StreamingQueryListener {
@volatile var firstStatus: StreamingQueryStatus = null
override def onQueryStarted(queryStarted: QueryStarted): Unit = { }
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
}
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { }
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
}
try {