diff --git a/docs/web-ui.md b/docs/web-ui.md index 69c9da6428..465f526ee5 100644 --- a/docs/web-ui.md +++ b/docs/web-ui.md @@ -426,11 +426,11 @@ queries. Currently, it contains the following metrics. * **Batch Duration.** The process duration of each batch. * **Operation Duration.** The amount of time taken to perform various operations in milliseconds. The tracked operations are listed as follows. - * addBatch: Adds result data of the current batch to the sink. - * getBatch: Gets a new batch of data to process. - * latestOffset: Gets the latest offsets for sources. - * queryPlanning: Generates the execution plan. - * walCommit: Writes the offsets to the metadata log. + * addBatch: Time taken to read the micro-batch's input data from the sources, process it, and write the batch's output to the sink. This should take the bulk of the micro-batch's time. + * getBatch: Time taken to prepare the logical query to read the input of the current micro-batch from the sources. + * latestOffset & getOffset: Time taken to query the maximum available offset for this source. + * queryPlanning: Time taken to generates the execution plan. + * walCommit: Time taken to write the offsets to the metadata log. As an early-release version, the statistics page is still under development and will be improved in future releases. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index e022bfb683..e0731db1f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -566,8 +566,7 @@ class MicroBatchExecution( val nextBatch = new Dataset(lastExecution, RowEncoder(lastExecution.analyzed.schema)) - val batchSinkProgress: Option[StreamWriterCommitProgress] = - reportTimeTaken("addBatch") { + val batchSinkProgress: Option[StreamWriterCommitProgress] = reportTimeTaken("addBatch") { SQLExecution.withNewExecutionId(lastExecution) { sink match { case s: Sink => s.addBatch(currentBatchId, nextBatch)