From 59daf91b7cfb50b1c20eb41959921fc03103b739 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Dec 2017 14:31:21 -0800 Subject: [PATCH] [SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution. ## What changes were proposed in this pull request? StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution. A few fields are also renamed to make them less microbatch-specific. ## How was this patch tested? refactoring only Author: Jose Torres Closes #19926 from joseph-torres/continuous-refactor. --- .../{BatchCommitLog.scala => CommitLog.scala} | 8 +- .../streaming/MicroBatchExecution.scala | 407 ++++++++++++++++ .../execution/streaming/StreamExecution.scala | 457 ++---------------- .../sql/streaming/StreamingQueryManager.scala | 2 +- .../streaming/EventTimeWatermarkSuite.scala | 4 +- .../sql/streaming/FileStreamSourceSuite.scala | 5 +- .../spark/sql/streaming/StreamSuite.scala | 2 +- .../spark/sql/streaming/StreamTest.scala | 20 +- .../streaming/StreamingAggregationSuite.scala | 4 +- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 10 files changed, 484 insertions(+), 429 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{BatchCommitLog.scala => CommitLog.scala} (93%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala similarity index 93% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala index 5e24e8fc4e..5b11424255 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala @@ -42,10 +42,10 @@ import org.apache.spark.sql.SparkSession * line 1: version * line 2: metadata (optional json string) */ -class BatchCommitLog(sparkSession: SparkSession, path: String) +class CommitLog(sparkSession: SparkSession, path: String) extends HDFSMetadataLog[String](sparkSession, path) { - import BatchCommitLog._ + import CommitLog._ def add(batchId: Long): Unit = { super.add(batchId, EMPTY_JSON) @@ -53,7 +53,7 @@ class BatchCommitLog(sparkSession: SparkSession, path: String) override def add(batchId: Long, metadata: String): Boolean = { throw new UnsupportedOperationException( - "BatchCommitLog does not take any metadata, use 'add(batchId)' instead") + "CommitLog does not take any metadata, use 'add(batchId)' instead") } override protected def deserialize(in: InputStream): String = { @@ -76,7 +76,7 @@ class BatchCommitLog(sparkSession: SparkSession, path: String) } } -object BatchCommitLog { +object CommitLog { private val VERSION = 1 private val EMPTY_JSON = "{}" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala new file mode 100644 index 0000000000..a67dda99dc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.util.{Clock, Utils} + +class MicroBatchExecution( + sparkSession: SparkSession, + name: String, + checkpointRoot: String, + analyzedPlan: LogicalPlan, + sink: Sink, + trigger: Trigger, + triggerClock: Clock, + outputMode: OutputMode, + deleteCheckpointOnStop: Boolean) + extends StreamExecution( + sparkSession, name, checkpointRoot, analyzedPlan, sink, + trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + private val triggerExecutor = trigger match { + case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => OneTimeExecutor() + case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + } + + override lazy val logicalPlan: LogicalPlan = { + assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in QueryExecutionThread " + + s"but the current thread was ${Thread.currentThread}") + var nextSourceId = 0L + val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() + val _logicalPlan = analyzedPlan.transform { + case streamingRelation@StreamingRelation(dataSource, _, output) => + toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output)(sparkSession) + }) + } + sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } + uniqueSources = sources.distinct + _logicalPlan + } + + /** + * Repeatedly attempts to run batches as data arrives. + */ + protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { + triggerExecutor.execute(() => { + startTrigger() + + if (isActive) { + reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { + // We'll do this initialization only once + populateStartOffsets(sparkSessionForStream) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) + logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { + constructNextBatch() + } + if (dataAvailable) { + currentStatus = currentStatus.copy(isDataAvailable = true) + updateStatusMessage("Processing new data") + runBatch(sparkSessionForStream) + } + } + // Report trigger as finished and construct progress object. + finishTrigger(dataAvailable) + if (dataAvailable) { + // Update committed offsets. + commitLog.add(currentBatchId) + committedOffsets ++= availableOffsets + logDebug(s"batch ${currentBatchId} committed") + // We'll increase currentBatchId after we complete processing current batch's data + currentBatchId += 1 + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) + } else { + currentStatus = currentStatus.copy(isDataAvailable = false) + updateStatusMessage("Waiting for data to arrive") + Thread.sleep(pollingDelayMs) + } + } + updateStatusMessage("Waiting for next trigger") + isActive + }) + } + + /** + * Populate the start offsets to start the execution at the current offsets stored in the sink + * (i.e. avoid reprocessing data that we have already processed). This function must be called + * before any processing occurs and will populate the following fields: + * - currentBatchId + * - committedOffsets + * - availableOffsets + * The basic structure of this method is as follows: + * + * Identify (from the offset log) the offsets used to run the last batch + * IF last batch exists THEN + * Set the next batch to be executed as the last recovered batch + * Check the commit log to see which batch was committed last + * IF the last batch was committed THEN + * Call getBatch using the last batch start and end offsets + * // ^^^^ above line is needed since some sources assume last batch always re-executes + * Setup for a new batch i.e., start = last batch end, and identify new end + * DONE + * ELSE + * Identify a brand new batch + * DONE + */ + private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { + offsetLog.getLatest() match { + case Some((latestBatchId, nextOffsets)) => + /* First assume that we are re-executing the latest known batch + * in the offset log */ + currentBatchId = latestBatchId + availableOffsets = nextOffsets.toStreamProgress(sources) + /* Initialize committed offsets to a committed batch, which at this + * is the second latest batch id in the offset log. */ + if (latestBatchId != 0) { + val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { + throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") + } + committedOffsets = secondLatestBatchId.toStreamProgress(sources) + } + + // update offset metadata + nextOffsets.metadata.foreach { metadata => + OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf) + offsetSeqMetadata = OffsetSeqMetadata( + metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf) + } + + /* identify the current batch id: if commit log indicates we successfully processed the + * latest batch id in the offset log, then we can safely move to the next batch + * i.e., committedBatchId + 1 */ + commitLog.getLatest() match { + case Some((latestCommittedBatchId, _)) => + if (latestBatchId == latestCommittedBatchId) { + /* The last batch was successfully committed, so we can safely process a + * new next batch but first: + * Make a call to getBatch using the offsets from previous batch. + * because certain sources (e.g., KafkaSource) assume on restart the last + * batch will be executed before getOffset is called again. */ + availableOffsets.foreach { ao: (Source, Offset) => + val (source, end) = ao + if (committedOffsets.get(source).map(_ != end).getOrElse(true)) { + val start = committedOffsets.get(source) + source.getBatch(start, end) + } + } + currentBatchId = latestCommittedBatchId + 1 + committedOffsets ++= availableOffsets + // Construct a new batch be recomputing availableOffsets + constructNextBatch() + } else if (latestCommittedBatchId < latestBatchId - 1) { + logWarning(s"Batch completion log latest batch id is " + + s"${latestCommittedBatchId}, which is not trailing " + + s"batchid $latestBatchId by one") + } + case None => logInfo("no commit log present") + } + logDebug(s"Resuming at batch $currentBatchId with committed offsets " + + s"$committedOffsets and available offsets $availableOffsets") + case None => // We are starting this stream for the first time. + logInfo(s"Starting new streaming query.") + currentBatchId = 0 + constructNextBatch() + } + } + + /** + * Returns true if there is any new data available to be processed. + */ + private def dataAvailable: Boolean = { + availableOffsets.exists { + case (source, available) => + committedOffsets + .get(source) + .map(committed => committed != available) + .getOrElse(true) + } + } + + /** + * Queries all of the sources to see if any new data is available. When there is new data the + * batchId counter is incremented and a new log entry is written with the newest offsets. + */ + private def constructNextBatch(): Unit = { + // Check to see what new data is available. + val hasNewData = { + awaitProgressLock.lock() + try { + val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s => + updateStatusMessage(s"Getting offsets from $s") + reportTimeTaken("getOffset") { + (s, s.getOffset) + } + }.toMap + availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) + + if (dataAvailable) { + true + } else { + noNewData = true + false + } + } finally { + awaitProgressLock.unlock() + } + } + if (hasNewData) { + var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs + // Update the eventTime watermarks if we find any in the plan. + if (lastExecution != null) { + lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec => e + }.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => + logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") + val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs + val prevWatermarkMs = watermarkMsMap.get(index) + if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { + watermarkMsMap.put(index, newWatermarkMs) + } + + // Populate 0 if we haven't seen any data yet for this watermark node. + case (_, index) => + if (!watermarkMsMap.isDefinedAt(index)) { + watermarkMsMap.put(index, 0) + } + } + + // Update the global watermark to the minimum of all watermark nodes. + // This is the safest option, because only the global watermark is fault-tolerant. Making + // it the minimum of all individual watermarks guarantees it will never advance past where + // any individual watermark operator would be if it were in a plan by itself. + if(!watermarkMsMap.isEmpty) { + val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 + if (newWatermarkMs > batchWatermarkMs) { + logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") + batchWatermarkMs = newWatermarkMs + } else { + logDebug( + s"Event time didn't move: $newWatermarkMs < " + + s"$batchWatermarkMs") + } + } + } + offsetSeqMetadata = offsetSeqMetadata.copy( + batchWatermarkMs = batchWatermarkMs, + batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds + + updateStatusMessage("Writing offsets to log") + reportTimeTaken("walCommit") { + assert(offsetLog.add( + currentBatchId, + availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + logInfo(s"Committed offsets for batch $currentBatchId. " + + s"Metadata ${offsetSeqMetadata.toString}") + + // NOTE: The following code is correct because runStream() processes exactly one + // batch at a time. If we add pipeline parallelism (multiple batches in flight at + // the same time), this cleanup logic will need to change. + + // Now that we've updated the scheduler's persistent checkpoint, it is safe for the + // sources to discard data from the previous batch. + if (currentBatchId != 0) { + val prevBatchOff = offsetLog.get(currentBatchId - 1) + if (prevBatchOff.isDefined) { + prevBatchOff.get.toStreamProgress(sources).foreach { + case (src, off) => src.commit(off) + } + } else { + throw new IllegalStateException(s"batch $currentBatchId doesn't exist") + } + } + + // It is now safe to discard the metadata beyond the minimum number to retain. + // Note that purge is exclusive, i.e. it purges everything before the target ID. + if (minLogEntriesToMaintain < currentBatchId) { + offsetLog.purge(currentBatchId - minLogEntriesToMaintain) + commitLog.purge(currentBatchId - minLogEntriesToMaintain) + } + } + } else { + awaitProgressLock.lock() + try { + // Wake up any threads that are waiting for the stream to progress. + awaitProgressLockCondition.signalAll() + } finally { + awaitProgressLock.unlock() + } + } + } + + /** + * Processes any data available between `availableOffsets` and `committedOffsets`. + * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with. + */ + private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = { + // Request unprocessed data from all sources. + newData = reportTimeTaken("getBatch") { + availableOffsets.flatMap { + case (source, available) + if committedOffsets.get(source).map(_ != available).getOrElse(true) => + val current = committedOffsets.get(source) + val batch = source.getBatch(current, available) + assert(batch.isStreaming, + s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" + + s"${batch.queryExecution.logical}") + logDebug(s"Retrieving data from $source: $current -> $available") + Some(source -> batch) + case _ => None + } + } + + // A list of attributes that will need to be updated. + val replacements = new ArrayBuffer[(Attribute, Attribute)] + // Replace sources in the logical plan with data that has arrived since the last batch. + val withNewSources = logicalPlan transform { + case StreamingExecutionRelation(source, output) => + newData.get(source).map { data => + val newPlan = data.logicalPlan + assert(output.size == newPlan.output.size, + s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + + s"${Utils.truncatedString(newPlan.output, ",")}") + replacements ++= output.zip(newPlan.output) + newPlan + }.getOrElse { + LocalRelation(output, isStreaming = true) + } + } + + // Rewire the plan to use the new attributes that were returned by the source. + val replacementMap = AttributeMap(replacements) + val triggerLogicalPlan = withNewSources transformAllExpressions { + case a: Attribute if replacementMap.contains(a) => + replacementMap(a).withMetadata(a.metadata) + case ct: CurrentTimestamp => + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, + ct.dataType) + case cd: CurrentDate => + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, + cd.dataType, cd.timeZoneId) + } + + reportTimeTaken("queryPlanning") { + lastExecution = new IncrementalExecution( + sparkSessionToRunBatch, + triggerLogicalPlan, + outputMode, + checkpointFile("state"), + runId, + currentBatchId, + offsetSeqMetadata) + lastExecution.executedPlan // Force the lazy generation of execution plan + } + + val nextBatch = + new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) + + reportTimeTaken("addBatch") { + SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { + sink.addBatch(currentBatchId, nextBatch) + } + } + + awaitProgressLock.lock() + try { + // Wake up any threads that are waiting for the stream to progress. + awaitProgressLockCondition.signalAll() + } finally { + awaitProgressLock.unlock() + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 16063c02ce..7946889e85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -22,10 +22,9 @@ import java.nio.channels.ClosedByInterruptException import java.util.UUID import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.locks.{Condition, ReentrantLock} import scala.collection.mutable.{Map => MutableMap} -import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import com.google.common.util.concurrent.UncheckedExecutionException @@ -33,10 +32,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader.Offset @@ -58,7 +55,7 @@ case object TERMINATED extends State * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without * errors */ -class StreamExecution( +abstract class StreamExecution( override val sparkSession: SparkSession, override val name: String, private val checkpointRoot: String, @@ -72,16 +69,16 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain - require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") + protected val minLogEntriesToMaintain: Int = sparkSession.sessionState.conf.minBatchesToRetain + require(minLogEntriesToMaintain > 0, "minBatchesToRetain has to be positive") /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ - private val awaitBatchLock = new ReentrantLock(true) - private val awaitBatchLockCondition = awaitBatchLock.newCondition() + protected val awaitProgressLock = new ReentrantLock(true) + protected val awaitProgressLockCondition = awaitProgressLock.newCondition() private val initializationLatch = new CountDownLatch(1) private val startLatch = new CountDownLatch(1) @@ -90,9 +87,11 @@ class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - checkpointPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri.toString + checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } + def logicalPlan: LogicalPlan + /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. @@ -160,36 +159,7 @@ class StreamExecution( /** * A list of unique sources in the query plan. This will be set when generating logical plan. */ - @volatile private var uniqueSources: Seq[Source] = Seq.empty - - override lazy val logicalPlan: LogicalPlan = { - assert(microBatchThread eq Thread.currentThread, - "logicalPlan must be initialized in StreamExecutionThread " + - s"but the current thread was ${Thread.currentThread}") - var nextSourceId = 0L - val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() - val _logicalPlan = analyzedPlan.transform { - case streamingRelation@StreamingRelation(dataSource, _, output) => - toExecutionRelationMap.getOrElseUpdate(streamingRelation, { - // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val source = dataSource.createSource(metadataPath) - nextSourceId += 1 - // We still need to use the previous `output` instead of `source.schema` as attributes in - // "df.logicalPlan" has already used attributes of the previous `output`. - StreamingExecutionRelation(source, output)(sparkSession) - }) - } - sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } - uniqueSources = sources.distinct - _logicalPlan - } - - private val triggerExecutor = trigger match { - case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) - case OneTimeTrigger => OneTimeExecutor() - case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") - } + @volatile protected var uniqueSources: Seq[Source] = Seq.empty /** Defines the internal state of execution */ private val state = new AtomicReference[State](INITIALIZING) @@ -215,13 +185,13 @@ class StreamExecution( * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a * running `KafkaConsumer` may cause endless loop. */ - val microBatchThread = - new StreamExecutionThread(s"stream execution thread for $prettyIdString") { + val queryExecutionThread: QueryExecutionThread = + new QueryExecutionThread(s"stream execution thread for $prettyIdString") { override def run(): Unit = { // To fix call site like "run at :0", we bridge the call site from the caller // thread to this micro batch thread sparkSession.sparkContext.setCallSite(callSite) - runBatches() + runStream() } } @@ -238,7 +208,7 @@ class StreamExecution( * fully processed, and its output was committed to the sink, hence no need to process it again. * This is used (for instance) during restart, to help identify which batch to run next. */ - val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits")) + val commitLog = new CommitLog(sparkSession, checkpointFile("commits")) /** Whether all fields of the query have been initialized */ private def isInitialized: Boolean = state.get != INITIALIZING @@ -250,7 +220,7 @@ class StreamExecution( override def exception: Option[StreamingQueryException] = Option(streamDeathCause) /** Returns the path of a file with `name` in the checkpoint directory. */ - private def checkpointFile(name: String): String = + protected def checkpointFile(name: String): String = new Path(new Path(resolvedCheckpointRoot), name).toUri.toString /** @@ -259,20 +229,25 @@ class StreamExecution( */ def start(): Unit = { logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") - microBatchThread.setDaemon(true) - microBatchThread.start() + queryExecutionThread.setDaemon(true) + queryExecutionThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted } /** - * Repeatedly attempts to run batches as data arrives. + * Run the activated stream until stopped. + */ + protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit + + /** + * Activate the stream and then wrap a callout to runActivatedStream, handling start and stop. * * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are * posted such that listeners are guaranteed to get a start event before a termination. * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the * `start()` method returns. */ - private def runBatches(): Unit = { + private def runStream(): Unit = { try { sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, interruptOnCancel = true) @@ -295,56 +270,18 @@ class StreamExecution( logicalPlan // Isolated spark session to run the batches with. - val sparkSessionToRunBatches = sparkSession.cloneSession() + val sparkSessionForStream = sparkSession.cloneSession() // Adaptive execution can change num shuffle partitions, disallow - sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") // Disable cost-based join optimization as we do not want stateful operations to be rearranged - sparkSessionToRunBatches.conf.set(SQLConf.CBO_ENABLED.key, "false") + sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false") offsetSeqMetadata = OffsetSeqMetadata( - batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionToRunBatches.conf) + batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) if (state.compareAndSet(INITIALIZING, ACTIVE)) { // Unblock `awaitInitialization` initializationLatch.countDown() - - triggerExecutor.execute(() => { - startTrigger() - - if (isActive) { - reportTimeTaken("triggerExecution") { - if (currentBatchId < 0) { - // We'll do this initialization only once - populateStartOffsets(sparkSessionToRunBatches) - sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) - logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { - constructNextBatch() - } - if (dataAvailable) { - currentStatus = currentStatus.copy(isDataAvailable = true) - updateStatusMessage("Processing new data") - runBatch(sparkSessionToRunBatches) - } - } - // Report trigger as finished and construct progress object. - finishTrigger(dataAvailable) - if (dataAvailable) { - // Update committed offsets. - batchCommitLog.add(currentBatchId) - committedOffsets ++= availableOffsets - logDebug(s"batch ${currentBatchId} committed") - // We'll increase currentBatchId after we complete processing current batch's data - currentBatchId += 1 - sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) - } else { - currentStatus = currentStatus.copy(isDataAvailable = false) - updateStatusMessage("Waiting for data to arrive") - Thread.sleep(pollingDelayMs) - } - } - updateStatusMessage("Waiting for next trigger") - isActive - }) + runActivatedStream(sparkSessionForStream) updateStatusMessage("Stopped") } else { // `stop()` is already called. Let `finally` finish the cleanup. @@ -373,7 +310,7 @@ class StreamExecution( if (!NonFatal(e)) { throw e } - } finally microBatchThread.runUninterruptibly { + } finally queryExecutionThread.runUninterruptibly { // The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted // when a query is stopped by the user. We need to make sure the following codes finish // otherwise it may throw `InterruptedException` to `UncaughtExceptionHandler` (SPARK-21248). @@ -410,12 +347,12 @@ class StreamExecution( } } } finally { - awaitBatchLock.lock() + awaitProgressLock.lock() try { // Wake up any threads that are waiting for the stream to progress. - awaitBatchLockCondition.signalAll() + awaitProgressLockCondition.signalAll() } finally { - awaitBatchLock.unlock() + awaitProgressLock.unlock() } terminationLatch.countDown() } @@ -448,296 +385,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - * Set the next batch to be executed as the last recovered batch - * Check the commit log to see which batch was committed last - * IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // ^^^^ above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - * DONE - * ELSE - * Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { - offsetLog.getLatest() match { - case Some((latestBatchId, nextOffsets)) => - /* First assume that we are re-executing the latest known batch - * in the offset log */ - currentBatchId = latestBatchId - availableOffsets = nextOffsets.toStreamProgress(sources) - /* Initialize committed offsets to a committed batch, which at this - * is the second latest batch id in the offset log. */ - if (latestBatchId != 0) { - val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { - throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") - } - committedOffsets = secondLatestBatchId.toStreamProgress(sources) - } - - // update offset metadata - nextOffsets.metadata.foreach { metadata => - OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf) - offsetSeqMetadata = OffsetSeqMetadata( - metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf) - } - - /* identify the current batch id: if commit log indicates we successfully processed the - * latest batch id in the offset log, then we can safely move to the next batch - * i.e., committedBatchId + 1 */ - batchCommitLog.getLatest() match { - case Some((latestCommittedBatchId, _)) => - if (latestBatchId == latestCommittedBatchId) { - /* The last batch was successfully committed, so we can safely process a - * new next batch but first: - * Make a call to getBatch using the offsets from previous batch. - * because certain sources (e.g., KafkaSource) assume on restart the last - * batch will be executed before getOffset is called again. */ - availableOffsets.foreach { ao: (Source, Offset) => - val (source, end) = ao - if (committedOffsets.get(source).map(_ != end).getOrElse(true)) { - val start = committedOffsets.get(source) - source.getBatch(start, end) - } - } - currentBatchId = latestCommittedBatchId + 1 - committedOffsets ++= availableOffsets - // Construct a new batch be recomputing availableOffsets - constructNextBatch() - } else if (latestCommittedBatchId < latestBatchId - 1) { - logWarning(s"Batch completion log latest batch id is " + - s"${latestCommittedBatchId}, which is not trailing " + - s"batchid $latestBatchId by one") - } - case None => logInfo("no commit log present") - } - logDebug(s"Resuming at batch $currentBatchId with committed offsets " + - s"$committedOffsets and available offsets $availableOffsets") - case None => // We are starting this stream for the first time. - logInfo(s"Starting new streaming query.") - currentBatchId = 0 - constructNextBatch() - } - } - - /** - * Returns true if there is any new data available to be processed. - */ - private def dataAvailable: Boolean = { - availableOffsets.exists { - case (source, available) => - committedOffsets - .get(source) - .map(committed => committed != available) - .getOrElse(true) - } - } - - /** - * Queries all of the sources to see if any new data is available. When there is new data the - * batchId counter is incremented and a new log entry is written with the newest offsets. - */ - private def constructNextBatch(): Unit = { - // Check to see what new data is available. - val hasNewData = { - awaitBatchLock.lock() - try { - val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s => - updateStatusMessage(s"Getting offsets from $s") - reportTimeTaken("getOffset") { - (s, s.getOffset) - } - }.toMap - availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) - - if (dataAvailable) { - true - } else { - noNewData = true - false - } - } finally { - awaitBatchLock.unlock() - } - } - if (hasNewData) { - var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermarks if we find any in the plan. - if (lastExecution != null) { - lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec => e - }.zipWithIndex.foreach { - case (e, index) if e.eventTimeStats.value.count > 0 => - logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") - val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs - val prevWatermarkMs = watermarkMsMap.get(index) - if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { - watermarkMsMap.put(index, newWatermarkMs) - } - - // Populate 0 if we haven't seen any data yet for this watermark node. - case (_, index) => - if (!watermarkMsMap.isDefinedAt(index)) { - watermarkMsMap.put(index, 0) - } - } - - // Update the global watermark to the minimum of all watermark nodes. - // This is the safest option, because only the global watermark is fault-tolerant. Making - // it the minimum of all individual watermarks guarantees it will never advance past where - // any individual watermark operator would be if it were in a plan by itself. - if(!watermarkMsMap.isEmpty) { - val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 - if (newWatermarkMs > batchWatermarkMs) { - logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") - batchWatermarkMs = newWatermarkMs - } else { - logDebug( - s"Event time didn't move: $newWatermarkMs < " + - s"$batchWatermarkMs") - } - } - } - offsetSeqMetadata = offsetSeqMetadata.copy( - batchWatermarkMs = batchWatermarkMs, - batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds - - updateStatusMessage("Writing offsets to log") - reportTimeTaken("walCommit") { - assert(offsetLog.add( - currentBatchId, - availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") - logInfo(s"Committed offsets for batch $currentBatchId. " + - s"Metadata ${offsetSeqMetadata.toString}") - - // NOTE: The following code is correct because runBatches() processes exactly one - // batch at a time. If we add pipeline parallelism (multiple batches in flight at - // the same time), this cleanup logic will need to change. - - // Now that we've updated the scheduler's persistent checkpoint, it is safe for the - // sources to discard data from the previous batch. - if (currentBatchId != 0) { - val prevBatchOff = offsetLog.get(currentBatchId - 1) - if (prevBatchOff.isDefined) { - prevBatchOff.get.toStreamProgress(sources).foreach { - case (src, off) => src.commit(off) - } - } else { - throw new IllegalStateException(s"batch $currentBatchId doesn't exist") - } - } - - // It is now safe to discard the metadata beyond the minimum number to retain. - // Note that purge is exclusive, i.e. it purges everything before the target ID. - if (minBatchesToRetain < currentBatchId) { - offsetLog.purge(currentBatchId - minBatchesToRetain) - batchCommitLog.purge(currentBatchId - minBatchesToRetain) - } - } - } else { - awaitBatchLock.lock() - try { - // Wake up any threads that are waiting for the stream to progress. - awaitBatchLockCondition.signalAll() - } finally { - awaitBatchLock.unlock() - } - } - } - - /** - * Processes any data available between `availableOffsets` and `committedOffsets`. - * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with. - */ - private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = { - // Request unprocessed data from all sources. - newData = reportTimeTaken("getBatch") { - availableOffsets.flatMap { - case (source, available) - if committedOffsets.get(source).map(_ != available).getOrElse(true) => - val current = committedOffsets.get(source) - val batch = source.getBatch(current, available) - assert(batch.isStreaming, - s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" + - s"${batch.queryExecution.logical}") - logDebug(s"Retrieving data from $source: $current -> $available") - Some(source -> batch) - case _ => None - } - } - - // A list of attributes that will need to be updated. - val replacements = new ArrayBuffer[(Attribute, Attribute)] - // Replace sources in the logical plan with data that has arrived since the last batch. - val withNewSources = logicalPlan transform { - case StreamingExecutionRelation(source, output) => - newData.get(source).map { data => - val newPlan = data.logicalPlan - assert(output.size == newPlan.output.size, - s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + - s"${Utils.truncatedString(newPlan.output, ",")}") - replacements ++= output.zip(newPlan.output) - newPlan - }.getOrElse { - LocalRelation(output, isStreaming = true) - } - } - - // Rewire the plan to use the new attributes that were returned by the source. - val replacementMap = AttributeMap(replacements) - val triggerLogicalPlan = withNewSources transformAllExpressions { - case a: Attribute if replacementMap.contains(a) => - replacementMap(a).withMetadata(a.metadata) - case ct: CurrentTimestamp => - CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, - ct.dataType) - case cd: CurrentDate => - CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, - cd.dataType, cd.timeZoneId) - } - - reportTimeTaken("queryPlanning") { - lastExecution = new IncrementalExecution( - sparkSessionToRunBatch, - triggerLogicalPlan, - outputMode, - checkpointFile("state"), - runId, - currentBatchId, - offsetSeqMetadata) - lastExecution.executedPlan // Force the lazy generation of execution plan - } - - val nextBatch = - new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) - - reportTimeTaken("addBatch") { - SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { - sink.addBatch(currentBatchId, nextBatch) - } - } - - awaitBatchLock.lock() - try { - // Wake up any threads that are waiting for the stream to progress. - awaitBatchLockCondition.signalAll() - } finally { - awaitBatchLock.unlock() - } - } - override protected def postEvent(event: StreamingQueryListener.Event): Unit = { sparkSession.streams.postListenerEvent(event) } @@ -762,10 +409,10 @@ class StreamExecution( // Set the state to TERMINATED so that the batching thread knows that it was interrupted // intentionally state.set(TERMINATED) - if (microBatchThread.isAlive) { + if (queryExecutionThread.isAlive) { sparkSession.sparkContext.cancelJobGroup(runId.toString) - microBatchThread.interrupt() - microBatchThread.join() + queryExecutionThread.interrupt() + queryExecutionThread.join() // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak sparkSession.sparkContext.cancelJobGroup(runId.toString) } @@ -784,21 +431,21 @@ class StreamExecution( } while (notDone) { - awaitBatchLock.lock() + awaitProgressLock.lock() try { - awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) + awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { throw streamDeathCause } } finally { - awaitBatchLock.unlock() + awaitProgressLock.unlock() } } logDebug(s"Unblocked at $newOffset for $source") } /** A flag to indicate that a batch has completed with no new data available. */ - @volatile private var noNewData = false + @volatile protected var noNewData = false /** * Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause @@ -806,7 +453,7 @@ class StreamExecution( * the stream thread forever. */ private def assertAwaitThread(): Unit = { - if (microBatchThread eq Thread.currentThread) { + if (queryExecutionThread eq Thread.currentThread) { throw new IllegalStateException( "Cannot wait for a query state from the same thread that is running the query") } @@ -833,11 +480,11 @@ class StreamExecution( throw streamDeathCause } if (!isActive) return - awaitBatchLock.lock() + awaitProgressLock.lock() try { noNewData = false while (true) { - awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS) + awaitProgressLockCondition.await(10000, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { throw streamDeathCause } @@ -846,7 +493,7 @@ class StreamExecution( } } } finally { - awaitBatchLock.unlock() + awaitProgressLock.unlock() } } @@ -900,7 +547,7 @@ class StreamExecution( |Current Available Offsets: $availableOffsets | |Current State: $state - |Thread State: ${microBatchThread.getState}""".stripMargin + |Thread State: ${queryExecutionThread.getState}""".stripMargin if (includeLogicalPlan) { debugString + s"\n\nLogical Plan:\n$logicalPlan" } else { @@ -908,7 +555,7 @@ class StreamExecution( } } - private def getBatchDescriptionString: String = { + protected def getBatchDescriptionString: String = { val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString Option(name).map(_ + "
").getOrElse("") + s"id = $id
runId = $runId
batch = $batchDescription" @@ -920,7 +567,7 @@ object StreamExecution { } /** - * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread - * and will use `classOf[StreamExecutionThread]` to check. + * A special thread to run the stream query. Some codes require to run in the QueryExecutionThread + * and will use `classOf[QueryxecutionThread]` to check. */ -abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name) +abstract class QueryExecutionThread(name: String) extends UninterruptibleThread(name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 48b0ea20e5..555d6e23f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } - new StreamingQueryWrapper(new StreamExecution( + new StreamingQueryWrapper(new MicroBatchExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 47bc452bda..d6bef9ce07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -260,8 +260,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche CheckLastBatch((10, 5)), StopStream, AssertOnQuery { q => // purge commit and clear the sink - val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L - q.batchCommitLog.purge(commit) + val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L + q.commitLog.purge(commit) q.sink.asInstanceOf[MemorySink].clear() true }, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 7a2d9e3728..c5b57bca18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1024,7 +1024,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { expectedCompactInterval: Int): Boolean = { import CompactibleFileStreamLog._ - val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] + val fileSource = getSourcesFromStreamingQuery(execution).head val metadataLog = fileSource invokePrivate _metadataLog() if (isCompactionBatch(batchId, expectedCompactInterval)) { @@ -1100,8 +1100,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { CheckAnswer("keep1", "keep2", "keep3"), AssertOnQuery("check getBatch") { execution: StreamExecution => val _sources = PrivateMethod[Seq[Source]]('sources) - val fileSource = - (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] + val fileSource = getSourcesFromStreamingQuery(execution).head def verify(startId: Option[Int], endId: Int, expected: String*): Unit = { val start = startId.map(new FileStreamSourceOffset(_)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 8163a1f91e..9e696b2236 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -276,7 +276,7 @@ class StreamSuite extends StreamTest { // Check the latest batchid in the commit log def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery = - AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId, + AssertOnQuery(_.commitLog.getLatest().get._1 == expectedId, s"commitLog's latest should be $expectedId") // Ensure that there has not been an incremental execution after restart diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 7a1ff89f2f..fb88c5d327 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -300,12 +300,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be if (currentStream != null) currentStream.committedOffsets.toString else "not started" def threadState = - if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" - def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) { - s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}" - } else { - "" - } + if (currentStream != null && currentStream.queryExecutionThread.isAlive) "alive" else "dead" + + def threadStackTrace = + if (currentStream != null && currentStream.queryExecutionThread.isAlive) { + s"Thread stack trace: ${currentStream.queryExecutionThread.getStackTrace.mkString("\n")}" + } else { + "" + } def testState = s""" @@ -460,7 +462,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be verify(currentStream != null, "can not stop a stream that is not running") try failAfter(streamingTimeout) { currentStream.stop() - verify(!currentStream.microBatchThread.isAlive, + verify(!currentStream.queryExecutionThread.isAlive, s"microbatch thread not stopped") verify(!currentStream.isActive, "query.isActive() is false even after stopping") @@ -486,7 +488,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be currentStream.awaitTermination() } eventually("microbatch thread not stopped after termination with failure") { - assert(!currentStream.microBatchThread.isAlive) + assert(!currentStream.queryExecutionThread.isAlive) } verify(currentStream.exception === Some(thrownException), s"incorrect exception returned by query.exception()") @@ -614,7 +616,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be case e: org.scalatest.exceptions.TestFailedDueToTimeoutException => failTest("Timed out waiting for stream", e) } finally { - if (currentStream != null && currentStream.microBatchThread.isAlive) { + if (currentStream != null && currentStream.queryExecutionThread.isAlive) { currentStream.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index fa0313592b..38aa517131 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -300,7 +300,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() - q.batchCommitLog.purge(3) + q.commitLog.purge(3) // advance by a minute i.e., 90 seconds total clock.advance(60 * 1000L) true @@ -352,7 +352,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() - q.batchCommitLog.purge(3) + q.commitLog.purge(3) // advance by 60 days i.e., 90 days total clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index f813b77e3c..ad4d3abd01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -173,12 +173,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi StopStream, // clears out StreamTest state AssertOnQuery { q => // both commit log and offset log contain the same (latest) batch id - q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) == + q.commitLog.getLatest().map(_._1).getOrElse(-1L) == q.offsetLog.getLatest().map(_._1).getOrElse(-2L) }, AssertOnQuery { q => // blow away commit log and sink result - q.batchCommitLog.purge(1) + q.commitLog.purge(1) q.sink.asInstanceOf[MemorySink].clear() true },