[SPARK-15580][SQL] Add ContinuousQueryInfo to make ContinuousQueryListener events serializable

## What changes were proposed in this pull request?

This PR adds ContinuousQueryInfo to make ContinuousQueryListener events serializable in order to support writing events into the event log.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13335 from zsxwing/query-info.
This commit is contained in:
Shixiong Zhu 2016-06-07 16:40:03 -07:00 committed by Tathagata Das
parent 695dbc816a
commit 0cfd6192f3
8 changed files with 203 additions and 76 deletions

View file

@ -22,15 +22,13 @@ import org.apache.spark.sql.streaming.ContinuousQueryListener
import org.apache.spark.util.ListenerBus
/**
* A bus to forward events to [[ContinuousQueryListener]]s. This one will wrap received
* [[ContinuousQueryListener.Event]]s as WrappedContinuousQueryListenerEvents and send them to the
* Spark listener bus. It also registers itself with Spark listener bus, so that it can receive
* WrappedContinuousQueryListenerEvents, unwrap them as ContinuousQueryListener.Events and
* dispatch them to ContinuousQueryListener.
* A bus to forward events to [[ContinuousQueryListener]]s. This one will send received
* [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with
* Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them
* to ContinuousQueryListener.
*/
class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
extends SparkListener
with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
import ContinuousQueryListener._
@ -45,13 +43,13 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
case s: QueryStarted =>
postToAll(s)
case _ =>
sparkListenerBus.post(new WrappedContinuousQueryListenerEvent(event))
sparkListenerBus.post(event)
}
}
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case WrappedContinuousQueryListenerEvent(e) =>
case e: ContinuousQueryListener.Event =>
postToAll(e)
case _ =>
}
@ -71,15 +69,4 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}
/**
* Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
* listener bus.
*/
private case class WrappedContinuousQueryListenerEvent(
streamingListenerEvent: ContinuousQueryListener.Event)
extends SparkListenerEvent {
// Do not log streaming events in event log as history server does not support these events.
protected[spark] override def logEvent: Boolean = false
}
}

View file

@ -131,12 +131,13 @@ class StreamExecution(
/** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = {
val localAvailableOffsets = availableOffsets
sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray
sources.map(s =>
new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray
}
/** Returns current status of the sink. */
override def sinkStatus: SinkStatus =
new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString)
/** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
@ -167,7 +168,7 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
postEvent(new QueryStarted(this)) // Assumption: Does not throw exception.
postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.
// Unblock starting thread
startLatch.countDown()
@ -206,7 +207,10 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(this))
postEvent(new QueryTerminated(
this.toInfo,
exception.map(_.getMessage),
exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
terminationLatch.countDown()
}
}
@ -374,7 +378,7 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
postEvent(new QueryProgress(this.toInfo))
}
private def postEvent(event: ContinuousQueryListener.Event) {
@ -484,6 +488,13 @@ class StreamExecution(
""".stripMargin
}
private def toInfo: ContinuousQueryInfo = {
new ContinuousQueryInfo(
this.name,
this.sourceStatuses,
this.sinkStatus)
}
trait State
case object INITIALIZED extends State
case object ACTIVE extends State

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* A class used to report information about the progress of a [[ContinuousQuery]].
*
* @param name The [[ContinuousQuery]] name.
* @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
* @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
*/
@Experimental
class ContinuousQueryInfo private[sql](
val name: String,
val sourceStatuses: Seq[SourceStatus],
val sinkStatus: SinkStatus)

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
import org.apache.spark.scheduler.SparkListenerEvent
/**
* :: Experimental ::
@ -70,26 +71,43 @@ abstract class ContinuousQueryListener {
object ContinuousQueryListener {
/**
* Base type of [[ContinuousQueryListener]] events.
* :: Experimental ::
* Base type of [[ContinuousQueryListener]] events
* @since 2.0.0
*/
trait Event
@Experimental
trait Event extends SparkListenerEvent
/**
* Event representing the start of a query.
* :: Experimental ::
* Event representing the start of a query
* @since 2.0.0
*/
class QueryStarted private[sql](val query: ContinuousQuery) extends Event
@Experimental
class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event
/**
* Event representing any progress updates in a query.
* :: Experimental ::
* Event representing any progress updates in a query
* @since 2.0.0
*/
class QueryProgress private[sql](val query: ContinuousQuery) extends Event
@Experimental
class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event
/**
* Event representing that termination of a query.
* :: Experimental ::
* Event representing that termination of a query
*
* @param queryInfo Information about the status of the query.
* @param exception The exception message of the [[ContinuousQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @param stackTrace The stack trace of the exception if the query was terminated with an
* exception. It will be empty if there was no error.
* @since 2.0.0
*/
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
@Experimental
class QueryTerminated private[sql](
val queryInfo: ContinuousQueryInfo,
val exception: Option[String],
val stackTrace: Seq[StackTraceElement]) extends Event
}

View file

@ -18,17 +18,17 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, Sink}
import org.apache.spark.sql.execution.streaming.Sink
/**
* :: Experimental ::
* Status and metrics of a streaming [[Sink]].
*
* @param description Description of the source corresponding to this status
* @param offset Current offset up to which data has been written by the sink
* @param offsetDesc Description of the current offset up to which data has been written by the sink
* @since 2.0.0
*/
@Experimental
class SinkStatus private[sql](
val description: String,
val offset: Offset)
val offsetDesc: String)

View file

@ -18,17 +18,17 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, Source}
import org.apache.spark.sql.execution.streaming.Source
/**
* :: Experimental ::
* Status and metrics of a streaming [[Source]].
*
* @param description Description of the source corresponding to this status
* @param offset Current offset of the source, if known
* @param description Description of the source corresponding to this status
* @param offsetDesc Description of the current [[Source]] offset if known
* @since 2.0.0
*/
@Experimental
class SourceStatus private[sql] (
val description: String,
val offset: Option[Offset])
val offsetDesc: Option[String])

View file

@ -26,7 +26,9 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.util.JsonProtocol
class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
@ -51,14 +53,13 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))
// The source and sink offsets must be None as this must be called before the
// batches have started
assert(status.sourceStatuses(0).offset === None)
assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
assert(status.sourceStatuses(0).offsetDesc === None)
assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString)
// No progress events or termination events
assert(listener.progressStatuses.isEmpty)
@ -73,9 +74,8 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
// No termination events
assert(listener.terminationStatus === null)
@ -86,10 +86,8 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
eventually(Timeout(streamingTimeout)) {
val status = listener.terminationStatus
assert(status != null)
assert(status.active === false) // must be inactive by the time onQueryTerm is called
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
}
listener.checkAsyncErrors()
}
@ -141,6 +139,92 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
test("exception should be reported in QueryTerminated") {
val listener = new QueryStatusCollector
withListenerAdded(listener) {
val input = MemoryStream[Int]
testStream(input.toDS.map(_ / 0))(
StartStream(),
AddData(input, 1),
ExpectFailure[SparkException](),
Assert {
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
assert(listener.terminationStatus !== null)
assert(listener.terminationException.isDefined)
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
assert(listener.terminationStackTrace.nonEmpty)
}
)
}
}
test("QueryStarted serialization") {
val queryStartedInfo = new ContinuousQueryInfo(
"name",
Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)),
new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo)
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[ContinuousQueryListener.QueryStarted]
assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo)
}
test("QueryProgress serialization") {
val queryProcessInfo = new ContinuousQueryInfo(
"name",
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString))
val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo)
val json = JsonProtocol.sparkEventToJson(queryProcess)
val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[ContinuousQueryListener.QueryProgress]
assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo)
}
test("QueryTerminated serialization") {
val queryTerminatedInfo = new ContinuousQueryInfo(
"name",
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString))
val exception = new RuntimeException("exception")
val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated(
queryTerminatedInfo,
Some(exception.getMessage),
exception.getStackTrace)
val json =
JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[ContinuousQueryListener.QueryTerminated]
assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
private def assertContinuousQueryInfoEquals(
expected: ContinuousQueryInfo,
actual: ContinuousQueryInfo): Unit = {
assert(expected.name === actual.name)
assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
case (expectedSource, actualSource) =>
assertSourceStatus(expectedSource, actualSource)
}
assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
}
private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = {
assert(expected.description === actual.description)
assert(expected.offsetDesc === actual.offsetDesc)
}
private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = {
assert(expected.description === actual.description)
assert(expected.offsetDesc === actual.offsetDesc)
}
private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
try {
@ -164,9 +248,12 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter
@volatile var startStatus: QueryStatus = null
@volatile var terminationStatus: QueryStatus = null
val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
@volatile var startStatus: ContinuousQueryInfo = null
@volatile var terminationStatus: ContinuousQueryInfo = null
@volatile var terminationException: Option[String] = null
@volatile var terminationStackTrace: Seq[StackTraceElement] = null
val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]
def reset(): Unit = {
startStatus = null
@ -182,35 +269,25 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
override def onQueryStarted(queryStarted: QueryStarted): Unit = {
asyncTestWaiter {
startStatus = QueryStatus(queryStarted.query)
startStatus = queryStarted.queryInfo
}
}
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
progressStatuses.add(QueryStatus(queryProgress.query))
progressStatuses.add(queryProgress.queryInfo)
}
}
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = QueryStatus(queryTerminated.query)
terminationStatus = queryTerminated.queryInfo
terminationException = queryTerminated.exception
terminationStackTrace = queryTerminated.stackTrace
}
asyncTestWaiter.dismiss()
}
}
case class QueryStatus(
active: Boolean,
exception: Option[Exception],
sourceStatuses: Array[SourceStatus],
sinkStatus: SinkStatus)
object QueryStatus {
def apply(query: ContinuousQuery): QueryStatus = {
QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
}
}
}

View file

@ -66,21 +66,21 @@ class ContinuousQuerySuite extends StreamTest {
testStream(mapped)(
AssertOnQuery(_.sourceStatuses.length === 1),
AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
AssertOnQuery(_.sourceStatuses(0).offset === None),
AssertOnQuery(_.sourceStatuses(0).offsetDesc === None),
AssertOnQuery(_.sinkStatus.description.contains("Memory")),
AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)),
AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString),
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))),
AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))),
AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)),
AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString),
AddData(inputData, 1, 2),
CheckAnswer(6, 3, 6, 3),
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))),
AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))),
AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(1).toString)),
AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
AddData(inputData, 0),
ExpectFailure[SparkException],
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))),
AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1)))
AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(2).toString)),
AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString)
)
}