[SPARK-31953][SS] Add Spark Structured Streaming History Server Support

### What changes were proposed in this pull request?

Add Spark Structured Streaming History Server Support.

### Why are the changes needed?

Add a streaming query history server plugin.

![image](https://user-images.githubusercontent.com/7402327/84248291-d26cfe80-ab3b-11ea-86d2-98205fa2bcc4.png)
![image](https://user-images.githubusercontent.com/7402327/84248347-e44ea180-ab3b-11ea-81de-eefe207656f2.png)
![image](https://user-images.githubusercontent.com/7402327/84248396-f0d2fa00-ab3b-11ea-9b0d-e410115471b0.png)

- Follow-ups
  - Query duration should not update in history UI.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Update UT.

Closes #28781 from uncleGen/SPARK-31953.

Lead-authored-by: uncleGen <hustyugm@gmail.com>
Co-authored-by: Genmao Yu <hustyugm@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
This commit is contained in:
uncleGen 2020-12-02 17:11:51 -08:00 committed by Shixiong Zhu
parent f94cb53a90
commit 4f96670358
No known key found for this signature in database
GPG key ID: 00CC7E88AC5A8A94
17 changed files with 693 additions and 158 deletions

View file

@ -123,6 +123,7 @@ SessionHandler.java
GangliaReporter.java
application_1578436911597_0052
config.properties
local-1596020211915
app-20200706201101-0003
py.typed
_metadata

View file

@ -1 +1,2 @@
org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin
org.apache.spark.sql.execution.ui.StreamingQueryHistoryServerPlugin

View file

@ -31,16 +31,21 @@ import org.apache.spark.util.ListenerBus
* Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them
* to StreamingQueryListeners.
*
* Note that each bus and its registered listeners are associated with a single SparkSession
* Note 1: Each bus and its registered listeners are associated with a single SparkSession
* and StreamingQueryManager. So this bus will dispatch events to registered listeners for only
* those queries that were started in the associated SparkSession.
*
* Note 2: To rebuild Structured Streaming UI in SHS, this bus will be registered into
* [[org.apache.spark.scheduler.ReplayListenerBus]]. We check `sparkListenerBus` defined or not to
* determine how to process [[StreamingQueryListener.Event]]. If false, it means this bus is used to
* replay all streaming query event from eventLog.
*/
class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {
import StreamingQueryListener._
sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY)
sparkListenerBus.foreach(_.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY))
/**
* RunIds of active queries whose events are supposed to be forwarded by this ListenerBus
@ -67,11 +72,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
event match {
case s: QueryStartedEvent =>
activeQueryRunIds.synchronized { activeQueryRunIds += s.runId }
sparkListenerBus.post(s)
sparkListenerBus.foreach(bus => bus.post(s))
// post to local listeners to trigger callbacks
postToAll(s)
case _ =>
sparkListenerBus.post(event)
sparkListenerBus.foreach(bus => bus.post(event))
}
}
@ -95,7 +100,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
// thread
if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
//
// When loaded by Spark History Server, we should process all event coming from replay
// listener bus.
if (sparkListenerBus.isEmpty || !LiveListenerBus.withinListenerThread.value ||
!e.isInstanceOf[QueryStartedEvent]) {
postToAll(e)
}
case _ =>
@ -110,7 +119,10 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
listener: StreamingQueryListener,
event: StreamingQueryListener.Event): Unit = {
def shouldReport(runId: UUID): Boolean = {
activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
// When loaded by Spark History Server, we should process all event coming from replay
// listener bus.
sparkListenerBus.isEmpty ||
activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
}
event match {

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.ui
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus
import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
import org.apache.spark.ui.SparkUI
class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin {
override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
val listenerBus = new StreamingQueryListenerBus(None)
listenerBus.addListener(new StreamingQueryStatusListener(conf, store))
Seq(listenerBus)
}
override def setupUI(ui: SparkUI): Unit = {
val streamingQueryStatusStore = new StreamingQueryStatusStore(ui.store.store)
if (streamingQueryStatusStore.allQueryUIData.nonEmpty) {
new StreamingQueryTab(streamingQueryStatusStore, ui)
}
}
override def displayOrder: Int = 1
}

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.ui
import java.util.UUID
import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper, StreamingQueryUIData}
import org.apache.spark.status.KVUtils
import org.apache.spark.util.kvstore.KVStore
/**
* Provides a view of a KVStore with methods that make it easy to query Streaming Query state.
* There's no state kept in this class, so it's ok to have multiple instances of it in an
* application.
*/
class StreamingQueryStatusStore(store: KVStore) {
def allQueryUIData: Seq[StreamingQueryUIData] = {
val view = store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L)
KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData)
}
// visible for test
private[sql] def getQueryProgressData(runId: UUID): Seq[StreamingQueryProgressWrapper] = {
val view = store.view(classOf[StreamingQueryProgressWrapper])
.index("runId").first(runId.toString).last(runId.toString)
KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
}
private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = {
val runId = summary.runId.toString
val view = store.view(classOf[StreamingQueryProgressWrapper])
.index("runId").first(runId).last(runId)
val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
.map(_.progress).sortBy(_.timestamp).toArray
StreamingQueryUIData(summary, recentProgress)
}
}

View file

@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
import org.apache.spark.status.ElementTrackingStore
@ -111,9 +111,9 @@ private[sql] class SharedState(
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
sparkContext.ui.flatMap { ui =>
if (conf.get(STREAMING_UI_ENABLED)) {
val statusListener = new StreamingQueryStatusListener(conf)
new StreamingQueryTab(statusListener, ui)
Some(statusListener)
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
new StreamingQueryTab(new StreamingQueryStatusStore(kvStore), ui)
Some(new StreamingQueryStatusListener(conf, kvStore))
} else {
None
}

View file

@ -49,7 +49,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
private val listenerBus =
new StreamingQueryListenerBus(Some(sparkSession.sparkContext.listenerBus))
@GuardedBy("activeQueriesSharedLock")
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]

View file

@ -40,8 +40,8 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
}
private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
.partition(_.isActive)
val (activeQueries, inactiveQueries) =
parent.store.allQueryUIData.partition(_.summary.isActive)
val content = mutable.ListBuffer[Node]()
// show active queries table only if there is at least one active query
@ -176,7 +176,7 @@ class StreamingQueryPagedTable(
val streamingQuery = query.streamingUIData
val statisticsLink = "%s/%s/statistics?id=%s"
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix,
streamingQuery.runId)
streamingQuery.summary.runId)
def details(detail: Any): Seq[Node] = {
if (isActive) {
@ -194,14 +194,14 @@ class StreamingQueryPagedTable(
<tr>
<td>{UIUtils.getQueryName(streamingQuery)}</td>
<td>{UIUtils.getQueryStatus(streamingQuery)}</td>
<td>{streamingQuery.id}</td>
<td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
<td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td>
<td>{streamingQuery.summary.id}</td>
<td><a href={statisticsLink}>{streamingQuery.summary.runId}</a></td>
<td>{SparkUIUtils.formatDate(streamingQuery.summary.startTimestamp)}</td>
<td>{SparkUIUtils.formatDurationVerbose(query.duration)}</td>
<td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>
{details(streamingQuery.exception.getOrElse("-"))}
{details(streamingQuery.summary.exception.getOrElse("-"))}
</tr>
}
}
@ -222,32 +222,32 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St
override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to)
private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
private def streamingRow(uiData: StreamingQueryUIData): StructuredStreamingRow = {
val duration = if (isActive) {
System.currentTimeMillis() - query.startTimestamp
System.currentTimeMillis() - uiData.summary.startTimestamp
} else {
withNoProgress(query, {
val endTimeMs = query.lastProgress.timestamp
parseProgressTimestamp(endTimeMs) - query.startTimestamp
withNoProgress(uiData, {
val endTimeMs = uiData.lastProgress.timestamp
parseProgressTimestamp(endTimeMs) - uiData.summary.startTimestamp
}, 0)
}
val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
query.recentProgress.length)
val avgInput = (uiData.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
uiData.recentProgress.length)
val avgProcess = (query.recentProgress.map(p =>
withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length)
val avgProcess = (uiData.recentProgress.map(p =>
withNumberInvalid(p.processedRowsPerSecond)).sum / uiData.recentProgress.length)
StructuredStreamingRow(duration, avgInput, avgProcess, query)
StructuredStreamingRow(duration, avgInput, avgProcess, uiData)
}
private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData))
case "Status" => Ordering.by(q => UIUtils.getQueryStatus(q.streamingUIData))
case "ID" => Ordering.by(_.streamingUIData.id)
case "Run ID" => Ordering.by(_.streamingUIData.runId)
case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp)
case "Name" => Ordering.by(row => UIUtils.getQueryName(row.streamingUIData))
case "Status" => Ordering.by(row => UIUtils.getQueryStatus(row.streamingUIData))
case "ID" => Ordering.by(_.streamingUIData.summary.id)
case "Run ID" => Ordering.by(_.streamingUIData.summary.runId)
case "Start Time" => Ordering.by(_.streamingUIData.summary.startTimestamp)
case "Duration" => Ordering.by(_.duration)
case "Avg Input /sec" => Ordering.by(_.avgInput)
case "Avg Process /sec" => Ordering.by(_.avgProcess)

View file

@ -58,8 +58,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val query = parent.statusListener.allQueryStatus.find { case q =>
q.runId.equals(UUID.fromString(parameterId))
val query = parent.store.allQueryUIData.find { uiData =>
uiData.summary.runId.equals(UUID.fromString(parameterId))
}.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId"))
val resources = generateLoadResources(request)
@ -109,34 +109,35 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<script>{Unparsed(js)}</script>
}
def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
val duration = if (query.isActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
def generateBasicInfo(uiData: StreamingQueryUIData): Seq[Node] = {
val duration = if (uiData.summary.isActive) {
val durationMs = System.currentTimeMillis() - uiData.summary.startTimestamp
SparkUIUtils.formatDurationVerbose(durationMs)
} else {
withNoProgress(query, {
val end = query.lastProgress.timestamp
val start = query.recentProgress.head.timestamp
withNoProgress(uiData, {
val end = uiData.lastProgress.timestamp
val start = uiData.recentProgress.head.timestamp
SparkUIUtils.formatDurationVerbose(
parseProgressTimestamp(end) - parseProgressTimestamp(start))
}, "-")
}
val name = UIUtils.getQueryName(query)
val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0)
val name = UIUtils.getQueryName(uiData)
val numBatches = withNoProgress(uiData, { uiData.lastProgress.batchId + 1L }, 0)
<div>Running batches for
<strong>
{duration}
</strong>
since
<strong>
{SparkUIUtils.formatDate(query.startTimestamp)}
{SparkUIUtils.formatDate(uiData.summary.startTimestamp)}
</strong>
(<strong>{numBatches}</strong> completed batches)
</div>
<br />
<div><strong>Name: </strong>{name}</div>
<div><strong>Id: </strong>{query.id}</div>
<div><strong>RunId: </strong>{query.runId}</div>
<div><strong>Id: </strong>{uiData.summary.id}</div>
<div><strong>RunId: </strong>{uiData.summary.runId}</div>
<br />
}

View file

@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
import org.apache.spark.status.{ElementTrackingStore, KVUtils}
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which contains all
* UI data for both active and inactive query.
* TODO: Add support for history server.
*/
private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener {
/**
* We use runId as the key here instead of id in active query status map,
* because the runId is unique for every started query, even it its a restart.
*/
private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
private[sql] class StreamingQueryStatusListener(
conf: SparkConf,
store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) { count =>
cleanupInactiveQueries(count)
}
// Events from the same query run will never be processed concurrently, so it's safe to
// access `progressIds` without any protection.
private val queryToProgress = new ConcurrentHashMap[UUID, mutable.Queue[String]]()
private def cleanupInactiveQueries(count: Long): Unit = {
val view = store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
val numInactiveQueries = inactiveQueries.size
if (numInactiveQueries <= inactiveQueryStatusRetention) {
return
}
val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
.take(numInactiveQueries - inactiveQueryStatusRetention)
val runIds = toDelete.map { e =>
store.delete(e.getClass, e.runId)
e.runId.toString
}
// Delete wrappers in one pass, as deleting them for each summary is slow
store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], "runId", runIds)
}
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
activeQueryStatus.putIfAbsent(event.runId,
new StreamingQueryUIData(event.name, event.id, event.runId, startTimestamp))
store.write(new StreamingQueryData(
event.name,
event.id,
event.runId,
isActive = true,
None,
startTimestamp
), checkTriggers = true)
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
val queryStatus = activeQueryStatus.getOrDefault(
event.progress.runId,
new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId,
batchTimestamp))
queryStatus.updateProcess(event.progress, streamingProgressRetention)
}
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
val queryStatus = activeQueryStatus.remove(event.runId)
if (queryStatus != null) {
queryStatus.queryTerminated(event)
inactiveQueryStatus += queryStatus
while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
inactiveQueryStatus.dequeue()
}
val runId = event.progress.runId
val batchId = event.progress.batchId
val timestamp = event.progress.timestamp
if (!queryToProgress.containsKey(runId)) {
queryToProgress.put(runId, mutable.Queue.empty[String])
}
val progressIds = queryToProgress.get(runId)
progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
store.write(new StreamingQueryProgressWrapper(event.progress))
while (progressIds.length > streamingProgressRetention) {
val uniqueId = progressIds.dequeue
store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
}
}
def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
val querySummary = store.read(classOf[StreamingQueryData], event.runId)
val curTime = System.currentTimeMillis()
store.write(new StreamingQueryData(
querySummary.name,
querySummary.id,
querySummary.runId,
isActive = false,
querySummary.exception,
querySummary.startTimestamp,
Some(curTime)
), checkTriggers = true)
queryToProgress.remove(event.runId)
}
}
private[sql] class StreamingQueryData(
val name: String,
val id: UUID,
@KVIndexParam val runId: UUID,
@KVIndexParam("active") val isActive: Boolean,
val exception: Option[String],
@KVIndexParam("startTimestamp") val startTimestamp: Long,
val endTimestamp: Option[Long] = None)
/**
* This class contains all message related to UI display, each instance corresponds to a single
* [[org.apache.spark.sql.streaming.StreamingQuery]].
*/
private[ui] class StreamingQueryUIData(
val name: String,
val id: UUID,
val runId: UUID,
val startTimestamp: Long) {
private[sql] case class StreamingQueryUIData(
summary: StreamingQueryData,
recentProgress: Array[StreamingQueryProgress]) {
/** Holds the most recent query progress updates. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
private var _isActive = true
private var _exception: Option[String] = None
def isActive: Boolean = synchronized { _isActive }
def exception: Option[String] = synchronized { _exception }
def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
_isActive = false
_exception = event.exception
}
def updateProcess(
newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >= retentionNum) {
progressBuffer.dequeue()
def lastProgress: StreamingQueryProgress = {
if (recentProgress.nonEmpty) {
recentProgress.last
} else {
null
}
}
}
def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
progressBuffer.toArray
}
private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
@JsonIgnore @KVIndex
private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp)
def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
progressBuffer.lastOption.orNull
@JsonIgnore @KVIndex("runId")
private def runIdIndex: String = progress.runId.toString
}
private[sql] object StreamingQueryProgressWrapper {
/**
* Adding `timestamp` into unique id to support reporting `empty` query progress
* in which no data comes but with the same batchId.
*/
def getUniqueId(
runId: UUID,
batchId: Long,
timestamp: String): String = {
s"${runId}_${batchId}_$timestamp"
}
}

View file

@ -17,10 +17,11 @@
package org.apache.spark.sql.streaming.ui
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[sql] class StreamingQueryTab(
val statusListener: StreamingQueryStatusListener,
val store: StreamingQueryStatusStore,
sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
override val name = "Structured Streaming"

View file

@ -46,19 +46,19 @@ private[ui] object UIUtils {
}
}
def getQueryName(query: StreamingQueryUIData): String = {
if (query.name == null || query.name.isEmpty) {
def getQueryName(uiData: StreamingQueryUIData): String = {
if (uiData.summary.name == null || uiData.summary.name.isEmpty) {
"<no name>"
} else {
query.name
uiData.summary.name
}
}
def getQueryStatus(query: StreamingQueryUIData): String = {
if (query.isActive) {
def getQueryStatus(uiData: StreamingQueryUIData): String = {
if (uiData.summary.isActive) {
"RUNNING"
} else {
query.exception.map(_ => "FAILED").getOrElse("FINISHED")
uiData.summary.exception.map(_ => "FAILED").getOrElse("FINISHED")
}
}

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.History.HISTORY_LOG_DIR
import org.apache.spark.util.ManualClock
object Utils {
def withFsHistoryProvider(logDir: String)(fn: FsHistoryProvider => Unit): Unit = {
var provider: FsHistoryProvider = null
try {
val clock = new ManualClock()
val conf = new SparkConf().set(HISTORY_LOG_DIR, logDir)
val provider = new FsHistoryProvider(conf, clock)
provider.checkForLogs()
fn(provider)
} finally {
if (provider != null) {
provider.stop()
provider = null
}
}
}
}

View file

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming.ui
import java.util.Locale
import javax.servlet.http.HttpServletRequest
import org.mockito.Mockito.{mock, when}
import org.scalatest.BeforeAndAfter
import org.apache.spark.deploy.history.{Utils => HsUtils}
import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.test.SharedSparkSession
class StreamingQueryHistorySuite extends SharedSparkSession with BeforeAndAfter {
test("support streaming query events") {
val logDir = Thread.currentThread().getContextClassLoader.getResource("spark-events").toString
HsUtils.withFsHistoryProvider(logDir) { provider =>
val appUi = provider.getAppUI("local-1596020211915", None).getOrElse {
assert(false, "Failed to load event log of local-1596020211915.")
null
}
assert(appUi.ui.appName == "StructuredKafkaWordCount")
assert(appUi.ui.store.store.count(classOf[StreamingQueryData]) == 1)
assert(appUi.ui.store.store.count(classOf[StreamingQueryProgressWrapper]) == 8)
val store = new StreamingQueryStatusStore(appUi.ui.store.store)
val tab = new StreamingQueryTab(store, appUi.ui)
val request = mock(classOf[HttpServletRequest])
var html = new StreamingQueryPage(tab).render(request)
.toString().toLowerCase(Locale.ROOT)
// 81.39: Avg Input /sec
assert(html.contains("81.39"))
// 157.05: Avg Process /sec
assert(html.contains("157.05"))
val id = "8d268dc2-bc9c-4be8-97a9-b135d2943028"
val runId = "e225d92f-2545-48f8-87a2-9c0309580f8a"
when(request.getParameter("id")).thenReturn(runId)
html = new StreamingQueryStatisticsPage(tab).render(request)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("<strong>8</strong> completed batches"))
assert(html.contains(id))
assert(html.contains(runId))
}
}
}

View file

@ -20,11 +20,13 @@ package org.apache.spark.sql.streaming.ui
import java.util.{Locale, UUID}
import javax.servlet.http.HttpServletRequest
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter
import scala.xml.Node
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.ui.SparkUI
@ -35,26 +37,26 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
val id = UUID.randomUUID()
val request = mock(classOf[HttpServletRequest])
val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(tab.statusListener).thenReturn(statusListener)
when(tab.store).thenReturn(store)
val streamQuery = createStreamQueryUIData(id)
when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
when(store.allQueryUIData).thenReturn(Seq(streamQuery))
var html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("active streaming queries (1)"))
when(streamQuery.isActive).thenReturn(false)
when(streamQuery.exception).thenReturn(None)
when(streamQuery.summary.isActive).thenReturn(false)
when(streamQuery.summary.exception).thenReturn(None)
html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("completed streaming queries (1)"))
assert(html.contains("finished"))
when(streamQuery.isActive).thenReturn(false)
when(streamQuery.exception).thenReturn(Option("exception in query"))
when(streamQuery.summary.isActive).thenReturn(false)
when(streamQuery.summary.exception).thenReturn(Option("exception in query"))
html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("completed streaming queries (1)"))
@ -66,17 +68,20 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
val id = UUID.randomUUID()
val request = mock(classOf[HttpServletRequest])
val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS)
when(request.getParameter("id")).thenReturn(id.toString)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(tab.store).thenReturn(store)
val ui = mock(classOf[SparkUI])
when(request.getParameter("id")).thenReturn(id.toString)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(tab.statusListener).thenReturn(statusListener)
when(ui.conf).thenReturn(new SparkConf())
when(tab.parent).thenReturn(ui)
val streamQuery = createStreamQueryUIData(id)
when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
when(store.allQueryUIData).thenReturn(Seq(streamQuery))
val html = renderStreamingQueryStatisticsPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
@ -94,15 +99,18 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
when(progress.batchId).thenReturn(2)
when(progress.prettyJson).thenReturn("""{"a":1}""")
val summary = mock(classOf[StreamingQueryData], RETURNS_SMART_NULLS)
when(summary.isActive).thenReturn(true)
when(summary.name).thenReturn("query")
when(summary.id).thenReturn(id)
when(summary.runId).thenReturn(id)
when(summary.startTimestamp).thenReturn(1L)
when(summary.exception).thenReturn(None)
val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
when(streamQuery.isActive).thenReturn(true)
when(streamQuery.name).thenReturn("query")
when(streamQuery.id).thenReturn(id)
when(streamQuery.runId).thenReturn(id)
when(streamQuery.startTimestamp).thenReturn(1L)
when(streamQuery.summary).thenReturn(summary)
when(streamQuery.lastProgress).thenReturn(progress)
when(streamQuery.recentProgress).thenReturn(Array(progress))
when(streamQuery.exception).thenReturn(None)
streamQuery
}

View file

@ -17,19 +17,28 @@
package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.text.SimpleDateFormat
import java.util.{Date, UUID}
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
import org.apache.spark.sql.streaming
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.kvstore.InMemoryStore
class StreamingQueryStatusListenerSuite extends StreamTest {
test("onQueryStarted, onQueryProgress, onQueryTerminated") {
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
val queryStore = new StreamingQueryStatusStore(kvStore)
// hanlde query started event
// handle query started event
val id = UUID.randomUUID()
val runId = UUID.randomUUID()
val startEvent = new StreamingQueryListener.QueryStartedEvent(
@ -37,8 +46,9 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryStarted(startEvent)
// result checking
assert(listener.activeQueryStatus.size() == 1)
assert(listener.activeQueryStatus.get(runId).name == "test")
assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData =>
uiData.summary.runId == runId && uiData.summary.name.equals("test")))
// handle query progress event
val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
@ -53,28 +63,32 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryProgress(processEvent)
// result checking
val activeQuery = listener.activeQueryStatus.get(runId)
assert(activeQuery.isActive)
assert(activeQuery.recentProgress.length == 1)
assert(activeQuery.lastProgress.id == id)
assert(activeQuery.lastProgress.runId == runId)
assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0)
assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0)
assert(activeQuery.lastProgress.batchId == 2)
assert(activeQuery.lastProgress.prettyJson == """{"a":1}""")
val activeQuery =
queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == runId)
assert(activeQuery.isDefined)
assert(activeQuery.get.summary.isActive)
assert(activeQuery.get.recentProgress.length == 1)
assert(activeQuery.get.lastProgress.id == id)
assert(activeQuery.get.lastProgress.runId == runId)
assert(activeQuery.get.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
assert(activeQuery.get.lastProgress.inputRowsPerSecond == 10.0)
assert(activeQuery.get.lastProgress.processedRowsPerSecond == 12.0)
assert(activeQuery.get.lastProgress.batchId == 2)
assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""")
// handle terminate event
val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
listener.onQueryTerminated(terminateEvent)
assert(!listener.inactiveQueryStatus.head.isActive)
assert(listener.inactiveQueryStatus.head.runId == runId)
assert(listener.inactiveQueryStatus.head.id == id)
assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id)
}
test("same query start multiple times") {
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
val queryStore = new StreamingQueryStatusStore(kvStore)
// handle first time start
val id = UUID.randomUUID()
@ -94,11 +108,106 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryStarted(startEvent1)
// result checking
assert(listener.activeQueryStatus.size() == 1)
assert(listener.inactiveQueryStatus.length == 1)
assert(listener.activeQueryStatus.containsKey(runId1))
assert(listener.activeQueryStatus.get(runId1).id == id)
assert(listener.inactiveQueryStatus.head.runId == runId0)
assert(listener.inactiveQueryStatus.head.id == id)
assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId == runId1))
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData =>
uiData.summary.runId == runId1 && uiData.summary.id == id))
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId0)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id)
}
test("test small retained queries") {
val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
val conf = spark.sparkContext.conf
conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2")
val listener = new StreamingQueryStatusListener(conf, kvStore)
val queryStore = new StreamingQueryStatusStore(kvStore)
def addNewQuery(): (UUID, UUID) = {
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
format.setTimeZone(getTimeZone("UTC"))
val id = UUID.randomUUID()
val runId = UUID.randomUUID()
val startEvent = new StreamingQueryListener.QueryStartedEvent(
id, runId, "test1", format.format(new Date(System.currentTimeMillis())))
listener.onQueryStarted(startEvent)
(id, runId)
}
def checkInactiveQueryStatus(numInactives: Int, targetInactives: Seq[UUID]): Unit = {
eventually(timeout(10.seconds)) {
val inactiveQueries = queryStore.allQueryUIData.filter(!_.summary.isActive)
assert(inactiveQueries.size == numInactives)
assert(inactiveQueries.map(_.summary.id).toSet == targetInactives.toSet)
}
}
val (id1, runId1) = addNewQuery()
val (id2, runId2) = addNewQuery()
val (id3, runId3) = addNewQuery()
assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0)
val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None)
listener.onQueryTerminated(terminateEvent1)
checkInactiveQueryStatus(1, Seq(id1))
val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None)
listener.onQueryTerminated(terminateEvent2)
checkInactiveQueryStatus(2, Seq(id1, id2))
val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None)
listener.onQueryTerminated(terminateEvent3)
checkInactiveQueryStatus(2, Seq(id2, id3))
}
test("test small retained progress") {
val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
val conf = spark.sparkContext.conf
conf.set(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES.key, "5")
val listener = new StreamingQueryStatusListener(conf, kvStore)
val queryStore = new StreamingQueryStatusStore(kvStore)
val id = UUID.randomUUID()
val runId = UUID.randomUUID()
val startEvent = new StreamingQueryListener.QueryStartedEvent(
id, runId, "test", "2016-12-05T20:54:20.827Z")
listener.onQueryStarted(startEvent)
var batchId: Int = 0
def addQueryProgress(): Unit = {
val progress = mockProgressData(id, runId)
val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress)
listener.onQueryProgress(processEvent)
}
def mockProgressData(id: UUID, runId: UUID): StreamingQueryProgress = {
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
format.setTimeZone(getTimeZone("UTC"))
val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
when(progress.id).thenReturn(id)
when(progress.runId).thenReturn(runId)
when(progress.timestamp).thenReturn(format.format(new Date(System.currentTimeMillis())))
when(progress.inputRowsPerSecond).thenReturn(10.0)
when(progress.processedRowsPerSecond).thenReturn(12.0)
when(progress.batchId).thenReturn(batchId)
when(progress.prettyJson).thenReturn("""{"a":1}""")
batchId += 1
progress
}
def checkQueryProcessData(targetNum: Int): Unit = {
eventually(timeout(10.seconds)) {
assert(queryStore.getQueryProgressData(runId).size == targetNum)
}
}
Array.tabulate(4) { _ => addQueryProgress() }
checkQueryProcessData(4)
addQueryProgress()
checkQueryProcessData(5)
addQueryProgress()
checkQueryProcessData(5)
}
}