[SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution.

## What changes were proposed in this pull request?

StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution.

A few fields are also renamed to make them less microbatch-specific.

## How was this patch tested?

refactoring only

Author: Jose Torres <jose@databricks.com>

Closes #19926 from joseph-torres/continuous-refactor.
This commit is contained in:
Jose Torres 2017-12-14 14:31:21 -08:00 committed by Shixiong Zhu
parent 2fe16333d5
commit 59daf91b7c
10 changed files with 484 additions and 429 deletions

View file

@ -42,10 +42,10 @@ import org.apache.spark.sql.SparkSession
* line 1: version
* line 2: metadata (optional json string)
*/
class BatchCommitLog(sparkSession: SparkSession, path: String)
class CommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[String](sparkSession, path) {
import BatchCommitLog._
import CommitLog._
def add(batchId: Long): Unit = {
super.add(batchId, EMPTY_JSON)
@ -53,7 +53,7 @@ class BatchCommitLog(sparkSession: SparkSession, path: String)
override def add(batchId: Long, metadata: String): Boolean = {
throw new UnsupportedOperationException(
"BatchCommitLog does not take any metadata, use 'add(batchId)' instead")
"CommitLog does not take any metadata, use 'add(batchId)' instead")
}
override protected def deserialize(in: InputStream): String = {
@ -76,7 +76,7 @@ class BatchCommitLog(sparkSession: SparkSession, path: String)
}
}
object BatchCommitLog {
object CommitLog {
private val VERSION = 1
private val EMPTY_JSON = "{}"
}

View file

@ -0,0 +1,407 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
class MicroBatchExecution(
sparkSession: SparkSession,
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
sink: Sink,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
deleteCheckpointOnStop: Boolean)
extends StreamExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
override lazy val logicalPlan: LogicalPlan = {
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSource, _, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)(sparkSession)
})
}
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
uniqueSources = sources.distinct
_logicalPlan
}
/**
* Repeatedly attempts to run batches as data arrives.
*/
protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
triggerExecutor.execute(() => {
startTrigger()
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionForStream)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
}
/**
* Populate the start offsets to start the execution at the current offsets stored in the sink
* (i.e. avoid reprocessing data that we have already processed). This function must be called
* before any processing occurs and will populate the following fields:
* - currentBatchId
* - committedOffsets
* - availableOffsets
* The basic structure of this method is as follows:
*
* Identify (from the offset log) the offsets used to run the last batch
* IF last batch exists THEN
* Set the next batch to be executed as the last recovered batch
* Check the commit log to see which batch was committed last
* IF the last batch was committed THEN
* Call getBatch using the last batch start and end offsets
* // ^^^^ above line is needed since some sources assume last batch always re-executes
* Setup for a new batch i.e., start = last batch end, and identify new end
* DONE
* ELSE
* Identify a brand new batch
* DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
case Some((latestBatchId, nextOffsets)) =>
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
}
committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}
// update offset metadata
nextOffsets.metadata.foreach { metadata =>
OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)
offsetSeqMetadata = OffsetSeqMetadata(
metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf)
}
/* identify the current batch id: if commit log indicates we successfully processed the
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1 */
commitLog.getLatest() match {
case Some((latestCommittedBatchId, _)) =>
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
* Make a call to getBatch using the offsets from previous batch.
* because certain sources (e.g., KafkaSource) assume on restart the last
* batch will be executed before getOffset is called again. */
availableOffsets.foreach { ao: (Source, Offset) =>
val (source, end) = ao
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
val start = committedOffsets.get(source)
source.getBatch(start, end)
}
}
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
s"batchid $latestBatchId by one")
}
case None => logInfo("no commit log present")
}
logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
constructNextBatch()
}
}
/**
* Returns true if there is any new data available to be processed.
*/
private def dataAvailable: Boolean = {
availableOffsets.exists {
case (source, available) =>
committedOffsets
.get(source)
.map(committed => committed != available)
.getOrElse(true)
}
}
/**
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
*/
private def constructNextBatch(): Unit = {
// Check to see what new data is available.
val hasNewData = {
awaitProgressLock.lock()
try {
val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
}.toMap
availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
if (dataAvailable) {
true
} else {
noNewData = true
false
}
} finally {
awaitProgressLock.unlock()
}
}
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
// Update the eventTime watermarks if we find any in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec => e
}.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = watermarkMsMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
watermarkMsMap.put(index, newWatermarkMs)
}
// Populate 0 if we haven't seen any data yet for this watermark node.
case (_, index) =>
if (!watermarkMsMap.isDefinedAt(index)) {
watermarkMsMap.put(index, 0)
}
}
// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
if(!watermarkMsMap.isEmpty) {
val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
if (newWatermarkMs > batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
batchWatermarkMs = newWatermarkMs
} else {
logDebug(
s"Event time didn't move: $newWatermarkMs < " +
s"$batchWatermarkMs")
}
}
}
offsetSeqMetadata = offsetSeqMetadata.copy(
batchWatermarkMs = batchWatermarkMs,
batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")
// NOTE: The following code is correct because runStream() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
// the same time), this cleanup logic will need to change.
// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
// sources to discard data from the previous batch.
if (currentBatchId != 0) {
val prevBatchOff = offsetLog.get(currentBatchId - 1)
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src, off) => src.commit(off)
}
} else {
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
}
}
// It is now safe to discard the metadata beyond the minimum number to retain.
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minLogEntriesToMaintain < currentBatchId) {
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
}
} else {
awaitProgressLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
}
}
}
/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
assert(batch.isStreaming,
s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
s"${batch.queryExecution.logical}")
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch)
case _ => None
}
}
// A list of attributes that will need to be updated.
val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
s"${Utils.truncatedString(newPlan.output, ",")}")
replacements ++= output.zip(newPlan.output)
newPlan
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
}
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) =>
replacementMap(a).withMetadata(a.metadata)
case ct: CurrentTimestamp =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
}
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
runId,
currentBatchId,
offsetSeqMetadata)
lastExecution.executedPlan // Force the lazy generation of execution plan
}
val nextBatch =
new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink.addBatch(currentBatchId, nextBatch)
}
}
awaitProgressLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
}
}
}

View file

@ -22,10 +22,9 @@ import java.nio.channels.ClosedByInterruptException
import java.util.UUID
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.locks.{Condition, ReentrantLock}
import scala.collection.mutable.{Map => MutableMap}
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.google.common.util.concurrent.UncheckedExecutionException
@ -33,10 +32,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.Offset
@ -58,7 +55,7 @@ case object TERMINATED extends State
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
* errors
*/
class StreamExecution(
abstract class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
private val checkpointRoot: String,
@ -72,16 +69,16 @@ class StreamExecution(
import org.apache.spark.sql.streaming.StreamingQueryListener._
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay
private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
protected val minLogEntriesToMaintain: Int = sparkSession.sessionState.conf.minBatchesToRetain
require(minLogEntriesToMaintain > 0, "minBatchesToRetain has to be positive")
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
private val awaitBatchLock = new ReentrantLock(true)
private val awaitBatchLockCondition = awaitBatchLock.newCondition()
protected val awaitProgressLock = new ReentrantLock(true)
protected val awaitProgressLockCondition = awaitProgressLock.newCondition()
private val initializationLatch = new CountDownLatch(1)
private val startLatch = new CountDownLatch(1)
@ -90,9 +87,11 @@ class StreamExecution(
val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointRoot)
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
checkpointPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri.toString
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
}
def logicalPlan: LogicalPlan
/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
@ -160,36 +159,7 @@ class StreamExecution(
/**
* A list of unique sources in the query plan. This will be set when generating logical plan.
*/
@volatile private var uniqueSources: Seq[Source] = Seq.empty
override lazy val logicalPlan: LogicalPlan = {
assert(microBatchThread eq Thread.currentThread,
"logicalPlan must be initialized in StreamExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSource, _, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)(sparkSession)
})
}
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
uniqueSources = sources.distinct
_logicalPlan
}
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
@volatile protected var uniqueSources: Seq[Source] = Seq.empty
/** Defines the internal state of execution */
private val state = new AtomicReference[State](INITIALIZING)
@ -215,13 +185,13 @@ class StreamExecution(
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
* running `KafkaConsumer` may cause endless loop.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
val queryExecutionThread: QueryExecutionThread =
new QueryExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
sparkSession.sparkContext.setCallSite(callSite)
runBatches()
runStream()
}
}
@ -238,7 +208,7 @@ class StreamExecution(
* fully processed, and its output was committed to the sink, hence no need to process it again.
* This is used (for instance) during restart, to help identify which batch to run next.
*/
val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits"))
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING
@ -250,7 +220,7 @@ class StreamExecution(
override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
/** Returns the path of a file with `name` in the checkpoint directory. */
private def checkpointFile(name: String): String =
protected def checkpointFile(name: String): String =
new Path(new Path(resolvedCheckpointRoot), name).toUri.toString
/**
@ -259,20 +229,25 @@ class StreamExecution(
*/
def start(): Unit = {
logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")
microBatchThread.setDaemon(true)
microBatchThread.start()
queryExecutionThread.setDaemon(true)
queryExecutionThread.start()
startLatch.await() // Wait until thread started and QueryStart event has been posted
}
/**
* Repeatedly attempts to run batches as data arrives.
* Run the activated stream until stopped.
*/
protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit
/**
* Activate the stream and then wrap a callout to runActivatedStream, handling start and stop.
*
* Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are
* posted such that listeners are guaranteed to get a start event before a termination.
* Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the
* `start()` method returns.
*/
private def runBatches(): Unit = {
private def runStream(): Unit = {
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
@ -295,56 +270,18 @@ class StreamExecution(
logicalPlan
// Isolated spark session to run the batches with.
val sparkSessionToRunBatches = sparkSession.cloneSession()
val sparkSessionForStream = sparkSession.cloneSession()
// Adaptive execution can change num shuffle partitions, disallow
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
sparkSessionToRunBatches.conf.set(SQLConf.CBO_ENABLED.key, "false")
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionToRunBatches.conf)
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
// Unblock `awaitInitialization`
initializationLatch.countDown()
triggerExecutor.execute(() => {
startTrigger()
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionToRunBatches)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
batchCommitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
runActivatedStream(sparkSessionForStream)
updateStatusMessage("Stopped")
} else {
// `stop()` is already called. Let `finally` finish the cleanup.
@ -373,7 +310,7 @@ class StreamExecution(
if (!NonFatal(e)) {
throw e
}
} finally microBatchThread.runUninterruptibly {
} finally queryExecutionThread.runUninterruptibly {
// The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted
// when a query is stopped by the user. We need to make sure the following codes finish
// otherwise it may throw `InterruptedException` to `UncaughtExceptionHandler` (SPARK-21248).
@ -410,12 +347,12 @@ class StreamExecution(
}
}
} finally {
awaitBatchLock.lock()
awaitProgressLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLockCondition.signalAll()
awaitProgressLockCondition.signalAll()
} finally {
awaitBatchLock.unlock()
awaitProgressLock.unlock()
}
terminationLatch.countDown()
}
@ -448,296 +385,6 @@ class StreamExecution(
}
}
/**
* Populate the start offsets to start the execution at the current offsets stored in the sink
* (i.e. avoid reprocessing data that we have already processed). This function must be called
* before any processing occurs and will populate the following fields:
* - currentBatchId
* - committedOffsets
* - availableOffsets
* The basic structure of this method is as follows:
*
* Identify (from the offset log) the offsets used to run the last batch
* IF last batch exists THEN
* Set the next batch to be executed as the last recovered batch
* Check the commit log to see which batch was committed last
* IF the last batch was committed THEN
* Call getBatch using the last batch start and end offsets
* // ^^^^ above line is needed since some sources assume last batch always re-executes
* Setup for a new batch i.e., start = last batch end, and identify new end
* DONE
* ELSE
* Identify a brand new batch
* DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
case Some((latestBatchId, nextOffsets)) =>
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
}
committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}
// update offset metadata
nextOffsets.metadata.foreach { metadata =>
OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)
offsetSeqMetadata = OffsetSeqMetadata(
metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf)
}
/* identify the current batch id: if commit log indicates we successfully processed the
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1 */
batchCommitLog.getLatest() match {
case Some((latestCommittedBatchId, _)) =>
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
* Make a call to getBatch using the offsets from previous batch.
* because certain sources (e.g., KafkaSource) assume on restart the last
* batch will be executed before getOffset is called again. */
availableOffsets.foreach { ao: (Source, Offset) =>
val (source, end) = ao
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
val start = committedOffsets.get(source)
source.getBatch(start, end)
}
}
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
s"batchid $latestBatchId by one")
}
case None => logInfo("no commit log present")
}
logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
constructNextBatch()
}
}
/**
* Returns true if there is any new data available to be processed.
*/
private def dataAvailable: Boolean = {
availableOffsets.exists {
case (source, available) =>
committedOffsets
.get(source)
.map(committed => committed != available)
.getOrElse(true)
}
}
/**
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
*/
private def constructNextBatch(): Unit = {
// Check to see what new data is available.
val hasNewData = {
awaitBatchLock.lock()
try {
val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
}.toMap
availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
if (dataAvailable) {
true
} else {
noNewData = true
false
}
} finally {
awaitBatchLock.unlock()
}
}
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
// Update the eventTime watermarks if we find any in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec => e
}.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = watermarkMsMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
watermarkMsMap.put(index, newWatermarkMs)
}
// Populate 0 if we haven't seen any data yet for this watermark node.
case (_, index) =>
if (!watermarkMsMap.isDefinedAt(index)) {
watermarkMsMap.put(index, 0)
}
}
// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
if(!watermarkMsMap.isEmpty) {
val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
if (newWatermarkMs > batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
batchWatermarkMs = newWatermarkMs
} else {
logDebug(
s"Event time didn't move: $newWatermarkMs < " +
s"$batchWatermarkMs")
}
}
}
offsetSeqMetadata = offsetSeqMetadata.copy(
batchWatermarkMs = batchWatermarkMs,
batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")
// NOTE: The following code is correct because runBatches() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
// the same time), this cleanup logic will need to change.
// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
// sources to discard data from the previous batch.
if (currentBatchId != 0) {
val prevBatchOff = offsetLog.get(currentBatchId - 1)
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src, off) => src.commit(off)
}
} else {
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
}
}
// It is now safe to discard the metadata beyond the minimum number to retain.
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minBatchesToRetain < currentBatchId) {
offsetLog.purge(currentBatchId - minBatchesToRetain)
batchCommitLog.purge(currentBatchId - minBatchesToRetain)
}
}
} else {
awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLockCondition.signalAll()
} finally {
awaitBatchLock.unlock()
}
}
}
/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
assert(batch.isStreaming,
s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
s"${batch.queryExecution.logical}")
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch)
case _ => None
}
}
// A list of attributes that will need to be updated.
val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
s"${Utils.truncatedString(newPlan.output, ",")}")
replacements ++= output.zip(newPlan.output)
newPlan
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
}
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) =>
replacementMap(a).withMetadata(a.metadata)
case ct: CurrentTimestamp =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
}
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
runId,
currentBatchId,
offsetSeqMetadata)
lastExecution.executedPlan // Force the lazy generation of execution plan
}
val nextBatch =
new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink.addBatch(currentBatchId, nextBatch)
}
}
awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLockCondition.signalAll()
} finally {
awaitBatchLock.unlock()
}
}
override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
sparkSession.streams.postListenerEvent(event)
}
@ -762,10 +409,10 @@ class StreamExecution(
// Set the state to TERMINATED so that the batching thread knows that it was interrupted
// intentionally
state.set(TERMINATED)
if (microBatchThread.isAlive) {
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
microBatchThread.interrupt()
microBatchThread.join()
queryExecutionThread.interrupt()
queryExecutionThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
@ -784,21 +431,21 @@ class StreamExecution(
}
while (notDone) {
awaitBatchLock.lock()
awaitProgressLock.lock()
try {
awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
throw streamDeathCause
}
} finally {
awaitBatchLock.unlock()
awaitProgressLock.unlock()
}
}
logDebug(s"Unblocked at $newOffset for $source")
}
/** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false
@volatile protected var noNewData = false
/**
* Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause
@ -806,7 +453,7 @@ class StreamExecution(
* the stream thread forever.
*/
private def assertAwaitThread(): Unit = {
if (microBatchThread eq Thread.currentThread) {
if (queryExecutionThread eq Thread.currentThread) {
throw new IllegalStateException(
"Cannot wait for a query state from the same thread that is running the query")
}
@ -833,11 +480,11 @@ class StreamExecution(
throw streamDeathCause
}
if (!isActive) return
awaitBatchLock.lock()
awaitProgressLock.lock()
try {
noNewData = false
while (true) {
awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS)
awaitProgressLockCondition.await(10000, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
throw streamDeathCause
}
@ -846,7 +493,7 @@ class StreamExecution(
}
}
} finally {
awaitBatchLock.unlock()
awaitProgressLock.unlock()
}
}
@ -900,7 +547,7 @@ class StreamExecution(
|Current Available Offsets: $availableOffsets
|
|Current State: $state
|Thread State: ${microBatchThread.getState}""".stripMargin
|Thread State: ${queryExecutionThread.getState}""".stripMargin
if (includeLogicalPlan) {
debugString + s"\n\nLogical Plan:\n$logicalPlan"
} else {
@ -908,7 +555,7 @@ class StreamExecution(
}
}
private def getBatchDescriptionString: String = {
protected def getBatchDescriptionString: String = {
val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
@ -920,7 +567,7 @@ object StreamExecution {
}
/**
* A special thread to run the stream query. Some codes require to run in the StreamExecutionThread
* and will use `classOf[StreamExecutionThread]` to check.
* A special thread to run the stream query. Some codes require to run in the QueryExecutionThread
* and will use `classOf[QueryxecutionThread]` to check.
*/
abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name)
abstract class QueryExecutionThread(name: String) extends UninterruptibleThread(name)

View file

@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
new StreamingQueryWrapper(new StreamExecution(
new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,

View file

@ -260,8 +260,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
CheckLastBatch((10, 5)),
StopStream,
AssertOnQuery { q => // purge commit and clear the sink
val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
q.batchCommitLog.purge(commit)
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
q.commitLog.purge(commit)
q.sink.asInstanceOf[MemorySink].clear()
true
},

View file

@ -1024,7 +1024,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
expectedCompactInterval: Int): Boolean = {
import CompactibleFileStreamLog._
val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
val fileSource = getSourcesFromStreamingQuery(execution).head
val metadataLog = fileSource invokePrivate _metadataLog()
if (isCompactionBatch(batchId, expectedCompactInterval)) {
@ -1100,8 +1100,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("keep1", "keep2", "keep3"),
AssertOnQuery("check getBatch") { execution: StreamExecution =>
val _sources = PrivateMethod[Seq[Source]]('sources)
val fileSource =
(execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
val fileSource = getSourcesFromStreamingQuery(execution).head
def verify(startId: Option[Int], endId: Int, expected: String*): Unit = {
val start = startId.map(new FileStreamSourceOffset(_))

View file

@ -276,7 +276,7 @@ class StreamSuite extends StreamTest {
// Check the latest batchid in the commit log
def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId,
AssertOnQuery(_.commitLog.getLatest().get._1 == expectedId,
s"commitLog's latest should be $expectedId")
// Ensure that there has not been an incremental execution after restart

View file

@ -300,12 +300,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
if (currentStream != null) currentStream.committedOffsets.toString else "not started"
def threadState =
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
} else {
""
}
if (currentStream != null && currentStream.queryExecutionThread.isAlive) "alive" else "dead"
def threadStackTrace =
if (currentStream != null && currentStream.queryExecutionThread.isAlive) {
s"Thread stack trace: ${currentStream.queryExecutionThread.getStackTrace.mkString("\n")}"
} else {
""
}
def testState =
s"""
@ -460,7 +462,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
verify(currentStream != null, "can not stop a stream that is not running")
try failAfter(streamingTimeout) {
currentStream.stop()
verify(!currentStream.microBatchThread.isAlive,
verify(!currentStream.queryExecutionThread.isAlive,
s"microbatch thread not stopped")
verify(!currentStream.isActive,
"query.isActive() is false even after stopping")
@ -486,7 +488,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
currentStream.awaitTermination()
}
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
assert(!currentStream.queryExecutionThread.isAlive)
}
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")
@ -614,7 +616,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
failTest("Timed out waiting for stream", e)
} finally {
if (currentStream != null && currentStream.microBatchThread.isAlive) {
if (currentStream != null && currentStream.queryExecutionThread.isAlive) {
currentStream.stop()
}

View file

@ -300,7 +300,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.batchCommitLog.purge(3)
q.commitLog.purge(3)
// advance by a minute i.e., 90 seconds total
clock.advance(60 * 1000L)
true
@ -352,7 +352,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.batchCommitLog.purge(3)
q.commitLog.purge(3)
// advance by 60 days i.e., 90 days total
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
true

View file

@ -173,12 +173,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
StopStream, // clears out StreamTest state
AssertOnQuery { q =>
// both commit log and offset log contain the same (latest) batch id
q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) ==
q.commitLog.getLatest().map(_._1).getOrElse(-1L) ==
q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
},
AssertOnQuery { q =>
// blow away commit log and sink result
q.batchCommitLog.purge(1)
q.commitLog.purge(1)
q.sink.asInstanceOf[MemorySink].clear()
true
},