[SPARK-15077][SQL] Use a fair lock to avoid thread starvation in StreamExecution

## What changes were proposed in this pull request?

Right now `StreamExecution.awaitBatchLock` uses an unfair lock. `StreamExecution.awaitOffset` may run too long and fail some test because `StreamExecution.constructNextBatch` keeps getting the lock.

See: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.4/865/testReport/junit/org.apache.spark.sql.streaming/FileStreamSourceStressTestSuite/file_source_stress_test/

This PR uses a fair ReentrantLock to resolve the thread starvation issue.

## How was this patch tested?

Modified `FileStreamSourceStressTestSuite.test("file source stress test")` to run the test codes 100 times locally. It always fails because of timeout without this patch.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12852 from zsxwing/SPARK-15077.
This commit is contained in:
Shixiong Zhu 2016-05-02 18:27:49 -07:00 committed by Michael Armbrust
parent 0fd95be3cd
commit 4e3685ae5e

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
@ -53,8 +54,12 @@ class StreamExecution(
val trigger: Trigger)
extends ContinuousQuery with Logging {
/** An monitor used to wait/notify when batches complete. */
private val awaitBatchLock = new Object
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
private val awaitBatchLock = new ReentrantLock(true)
private val awaitBatchLockCondition = awaitBatchLock.newCondition()
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)
@ -242,17 +247,22 @@ class StreamExecution(
// method. See SPARK-14131.
//
// Check to see what new data is available.
val hasNewData = awaitBatchLock.synchronized {
val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
availableOffsets ++= newData
val hasNewData = {
awaitBatchLock.lock()
try {
val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
availableOffsets ++= newData
if (dataAvailable) {
true
} else {
noNewData = true
false
if (dataAvailable) {
true
} else {
noNewData = true
false
}
} finally {
awaitBatchLock.unlock()
}
}
if (hasNewData) {
@ -269,9 +279,12 @@ class StreamExecution(
currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
} else {
awaitBatchLock.synchronized {
awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLock.notifyAll()
awaitBatchLockCondition.signalAll()
} finally {
awaitBatchLock.unlock()
}
}
}
@ -332,9 +345,12 @@ class StreamExecution(
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
sink.addBatch(currentBatchId - 1, nextBatch)
awaitBatchLock.synchronized {
awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLock.notifyAll()
awaitBatchLockCondition.signalAll()
} finally {
awaitBatchLock.unlock()
}
val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
@ -374,8 +390,12 @@ class StreamExecution(
}
while (notDone) {
logInfo(s"Waiting until $newOffset at $source")
awaitBatchLock.synchronized { awaitBatchLock.wait(100) }
awaitBatchLock.lock()
try {
awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
} finally {
awaitBatchLock.unlock()
}
}
logDebug(s"Unblocked at $newOffset for $source")
}
@ -383,16 +403,21 @@ class StreamExecution(
/** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false
override def processAllAvailable(): Unit = awaitBatchLock.synchronized {
noNewData = false
while (true) {
awaitBatchLock.wait(10000)
if (streamDeathCause != null) {
throw streamDeathCause
}
if (noNewData) {
return
override def processAllAvailable(): Unit = {
awaitBatchLock.lock()
try {
noNewData = false
while (true) {
awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
throw streamDeathCause
}
if (noNewData) {
return
}
}
} finally {
awaitBatchLock.unlock()
}
}