[SPARK-21421][SS] Add the query id as a local property to allow source and sink using it

## What changes were proposed in this pull request?

Add the query id as a local property to allow source and sink using it.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #18638 from zsxwing/SPARK-21421.
This commit is contained in:
Shixiong Zhu 2017-07-14 14:37:27 -07:00
parent 601a237b30
commit 2d968a07d2
2 changed files with 31 additions and 0 deletions

View file

@ -263,6 +263,7 @@ class StreamExecution(
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, id.toString)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
@ -842,6 +843,9 @@ class StreamExecution(
}
}
object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"
}
/**
* A special thread to run the stream query. Some codes require to run in the StreamExecutionThread

View file

@ -613,6 +613,33 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
test("get the query id in source") {
@volatile var queryId: String = null
val source = new Source {
override def stop(): Unit = {}
override def getOffset: Option[Offset] = {
queryId = spark.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
None
}
override def getBatch(start: Option[Offset], end: Offset): DataFrame = spark.emptyDataFrame
override def schema: StructType = MockSourceProvider.fakeSchema
}
MockSourceProvider.withMockSources(source) {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.util.MockSourceProvider")
.load()
testStream(df)(
AssertOnQuery { sq =>
sq.processAllAvailable()
assert(sq.id.toString === queryId)
assert(sq.runId.toString !== queryId)
true
}
)
}
}
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)