[SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6058 from tdas/SPARK-7530 and squashes the following commits:
80ee0e6 [Tathagata Das] STARTED --> ACTIVE
3da6547 [Tathagata Das] Added synchronized
dd88444 [Tathagata Das] Added more docs
e1a8505 [Tathagata Das] Fixed comment length
89f9980 [Tathagata Das] Change to Java enum and added Java test
7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
dd4e702 [Tathagata Das] Addressed comments.
3d56106 [Tathagata Das] Added Mima excludes
2b86ba1 [Tathagata Das] Added scala docs.
1722433 [Tathagata Das] Fixed style
976b094 [Tathagata Das] Added license
0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState
(cherry picked from commit f9c7580ada
)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This commit is contained in:
parent
f188815989
commit
c16b47f9ed
|
@ -106,6 +106,10 @@ object MimaExcludes {
|
|||
"org.apache.spark.sql.parquet.ParquetTestData$"),
|
||||
ProblemFilters.exclude[MissingClassProblem](
|
||||
"org.apache.spark.sql.parquet.TestGroupWriteSupport")
|
||||
) ++ Seq(
|
||||
// SPARK-7530 Added StreamingContext.getState()
|
||||
ProblemFilters.exclude[MissingMethodProblem](
|
||||
"org.apache.spark.streaming.StreamingContext.state_=")
|
||||
)
|
||||
|
||||
case v if v.startsWith("1.3") =>
|
||||
|
|
|
@ -32,10 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
|
|||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.annotation.Experimental
|
||||
import org.apache.spark.annotation.{DeveloperApi, Experimental}
|
||||
import org.apache.spark.input.FixedLengthBinaryInputFormat
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.streaming.StreamingContextState._
|
||||
import org.apache.spark.streaming.dstream._
|
||||
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
|
||||
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
|
||||
|
@ -195,14 +196,7 @@ class StreamingContext private[streaming] (
|
|||
assert(env.metricsSystem != null)
|
||||
env.metricsSystem.registerSource(streamingSource)
|
||||
|
||||
/** Enumeration to identify current state of the StreamingContext */
|
||||
private[streaming] object StreamingContextState extends Enumeration {
|
||||
type CheckpointState = Value
|
||||
val Initialized, Started, Stopped = Value
|
||||
}
|
||||
|
||||
import StreamingContextState._
|
||||
private[streaming] var state = Initialized
|
||||
private var state: StreamingContextState = INITIALIZED
|
||||
|
||||
private val startSite = new AtomicReference[CallSite](null)
|
||||
|
||||
|
@ -516,6 +510,21 @@ class StreamingContext private[streaming] (
|
|||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
*
|
||||
* Return the current state of the context. The context can be in three possible states -
|
||||
* - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
|
||||
* Input DStreams, transformations and output operations can be created on the context.
|
||||
* - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
|
||||
* Input DStreams, transformations and output operations cannot be created on the context.
|
||||
* - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
|
||||
*/
|
||||
@DeveloperApi
|
||||
def getState(): StreamingContextState = synchronized {
|
||||
state
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the execution of the streams.
|
||||
*
|
||||
|
@ -523,11 +532,13 @@ class StreamingContext private[streaming] (
|
|||
*/
|
||||
def start(): Unit = synchronized {
|
||||
import StreamingContext._
|
||||
if (state == Started) {
|
||||
throw new SparkException("StreamingContext has already been started")
|
||||
}
|
||||
if (state == Stopped) {
|
||||
throw new SparkException("StreamingContext has already been stopped")
|
||||
state match {
|
||||
case INITIALIZED =>
|
||||
// good to start
|
||||
case ACTIVE =>
|
||||
throw new SparkException("StreamingContext has already been started")
|
||||
case STOPPED =>
|
||||
throw new SparkException("StreamingContext has already been stopped")
|
||||
}
|
||||
validate()
|
||||
startSite.set(DStream.getCreationSite())
|
||||
|
@ -536,7 +547,7 @@ class StreamingContext private[streaming] (
|
|||
assertNoOtherContextIsActive()
|
||||
scheduler.start()
|
||||
uiTab.foreach(_.attach())
|
||||
state = Started
|
||||
state = StreamingContextState.ACTIVE
|
||||
setActiveContext(this)
|
||||
}
|
||||
}
|
||||
|
@ -598,22 +609,26 @@ class StreamingContext private[streaming] (
|
|||
* received data to be completed
|
||||
*/
|
||||
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
|
||||
state match {
|
||||
case Initialized => logWarning("StreamingContext has not been started yet")
|
||||
case Stopped => logWarning("StreamingContext has already been stopped")
|
||||
case Started =>
|
||||
scheduler.stop(stopGracefully)
|
||||
logInfo("StreamingContext stopped successfully")
|
||||
waiter.notifyStop()
|
||||
try {
|
||||
state match {
|
||||
case INITIALIZED =>
|
||||
logWarning("StreamingContext has not been started yet")
|
||||
case STOPPED =>
|
||||
logWarning("StreamingContext has already been stopped")
|
||||
case ACTIVE =>
|
||||
scheduler.stop(stopGracefully)
|
||||
uiTab.foreach(_.detach())
|
||||
StreamingContext.setActiveContext(null)
|
||||
waiter.notifyStop()
|
||||
logInfo("StreamingContext stopped successfully")
|
||||
}
|
||||
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
|
||||
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
|
||||
if (stopSparkContext) sc.stop()
|
||||
} finally {
|
||||
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
|
||||
state = STOPPED
|
||||
}
|
||||
// Even if the streaming context has not been started, we still need to stop the SparkContext.
|
||||
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
|
||||
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
|
||||
if (stopSparkContext) sc.stop()
|
||||
uiTab.foreach(_.detach())
|
||||
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
|
||||
state = Stopped
|
||||
StreamingContext.setActiveContext(null)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.streaming;
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi;
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
*
|
||||
* Represents the state of a StreamingContext.
|
||||
*/
|
||||
@DeveloperApi
|
||||
public enum StreamingContextState {
|
||||
/**
|
||||
* The context has been created, but not been started yet.
|
||||
* Input DStreams, transformations and output operations can be created on the context.
|
||||
*/
|
||||
INITIALIZED,
|
||||
|
||||
/**
|
||||
* The context has been started, and been not stopped.
|
||||
* Input DStreams, transformations and output operations cannot be created on the context.
|
||||
*/
|
||||
ACTIVE,
|
||||
|
||||
/**
|
||||
* The context has been stopped and cannot be used any more.
|
||||
*/
|
||||
STOPPED
|
||||
}
|
|
@ -578,6 +578,28 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
|
|||
ssc.addStreamingListener(streamingListener)
|
||||
}
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
*
|
||||
* Return the current state of the context. The context can be in three possible states -
|
||||
* <ul>
|
||||
* <li>
|
||||
* StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
|
||||
* Input DStreams, transformations and output operations can be created on the context.
|
||||
* </li>
|
||||
* <li>
|
||||
* StreamingContextState.ACTIVE - The context has been started, and been not stopped.
|
||||
* Input DStreams, transformations and output operations cannot be created on the context.
|
||||
* </li>
|
||||
* <li>
|
||||
* StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
|
||||
* </li>
|
||||
* </ul>
|
||||
*/
|
||||
def getState(): StreamingContextState = {
|
||||
ssc.getState()
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the execution of the streams.
|
||||
*/
|
||||
|
|
|
@ -70,6 +70,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
|
|||
Assert.assertNotNull(ssc.sparkContext());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testContextState() {
|
||||
List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
|
||||
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
|
||||
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
|
||||
JavaTestUtils.attachTestOutputStream(stream);
|
||||
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
|
||||
ssc.start();
|
||||
Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
|
||||
ssc.stop();
|
||||
Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testCount() {
|
||||
|
|
|
@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
|
|||
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
|
||||
implicit val cm: ClassTag[V] =
|
||||
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
|
||||
ssc.getState()
|
||||
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
|
||||
val out = new ArrayList[JList[V]]()
|
||||
res.map(entry => out.append(new ArrayList[V](entry)))
|
||||
|
|
|
@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
|
|||
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
|
||||
}
|
||||
|
||||
test("state matching") {
|
||||
import StreamingContextState._
|
||||
assert(INITIALIZED === INITIALIZED)
|
||||
assert(INITIALIZED != ACTIVE)
|
||||
}
|
||||
|
||||
test("start and stop state check") {
|
||||
ssc = new StreamingContext(master, appName, batchDuration)
|
||||
addInputStream(ssc).register()
|
||||
|
||||
assert(ssc.state === ssc.StreamingContextState.Initialized)
|
||||
assert(ssc.getState() === StreamingContextState.INITIALIZED)
|
||||
ssc.start()
|
||||
assert(ssc.state === ssc.StreamingContextState.Started)
|
||||
assert(ssc.getState() === StreamingContextState.ACTIVE)
|
||||
ssc.stop()
|
||||
assert(ssc.state === ssc.StreamingContextState.Stopped)
|
||||
assert(ssc.getState() === StreamingContextState.STOPPED)
|
||||
|
||||
// Make sure that the SparkContext is also stopped by default
|
||||
intercept[Exception] {
|
||||
|
@ -129,9 +135,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
|
|||
ssc = new StreamingContext(master, appName, batchDuration)
|
||||
addInputStream(ssc).register()
|
||||
ssc.start()
|
||||
assert(ssc.getState() === StreamingContextState.ACTIVE)
|
||||
intercept[SparkException] {
|
||||
ssc.start()
|
||||
}
|
||||
assert(ssc.getState() === StreamingContextState.ACTIVE)
|
||||
}
|
||||
|
||||
test("stop multiple times") {
|
||||
|
@ -139,13 +147,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
|
|||
addInputStream(ssc).register()
|
||||
ssc.start()
|
||||
ssc.stop()
|
||||
assert(ssc.getState() === StreamingContextState.STOPPED)
|
||||
ssc.stop()
|
||||
assert(ssc.getState() === StreamingContextState.STOPPED)
|
||||
}
|
||||
|
||||
test("stop before start") {
|
||||
ssc = new StreamingContext(master, appName, batchDuration)
|
||||
addInputStream(ssc).register()
|
||||
ssc.stop() // stop before start should not throw exception
|
||||
assert(ssc.getState() === StreamingContextState.STOPPED)
|
||||
}
|
||||
|
||||
test("start after stop") {
|
||||
|
@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
|
|||
intercept[SparkException] {
|
||||
ssc.start() // start after stop should throw exception
|
||||
}
|
||||
assert(ssc.getState() === StreamingContextState.STOPPED)
|
||||
}
|
||||
|
||||
test("stop only streaming context") {
|
||||
|
@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
|
|||
addInputStream(ssc).register()
|
||||
ssc.start()
|
||||
ssc.stop(stopSparkContext = false)
|
||||
assert(ssc.getState() === StreamingContextState.STOPPED)
|
||||
assert(sc.makeRDD(1 to 100).collect().size === 100)
|
||||
sc.stop()
|
||||
|
||||
|
|
Loading…
Reference in a new issue