[SPARK-18754][SS] Rename recentProgresses to recentProgress
Based on an informal survey, users find this option easier to understand / remember. Author: Michael Armbrust <michael@databricks.com> Closes #16182 from marmbrus/renameRecentProgress.
This commit is contained in:
parent
edc87e1892
commit
70b2bf717d
|
@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
|
|||
AddKafkaData(Set(topic), 1, 2, 3),
|
||||
CheckAnswer(2, 3, 4),
|
||||
AssertOnQuery { query =>
|
||||
val recordsRead = query.recentProgresses.map(_.numInputRows).sum
|
||||
val recordsRead = query.recentProgress.map(_.numInputRows).sum
|
||||
recordsRead == 3
|
||||
}
|
||||
)
|
||||
|
|
|
@ -91,7 +91,7 @@ object MimaExcludes {
|
|||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
|
||||
|
||||
|
|
|
@ -114,12 +114,12 @@ class StreamingQuery(object):
|
|||
|
||||
@property
|
||||
@since(2.1)
|
||||
def recentProgresses(self):
|
||||
def recentProgress(self):
|
||||
"""Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
|
||||
The number of progress updates retained for each stream is configured by Spark session
|
||||
configuration `spark.sql.streaming.numRecentProgresses`.
|
||||
configuration `spark.sql.streaming.numRecentProgressUpdates`.
|
||||
"""
|
||||
return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
|
||||
return [json.loads(p.json()) for p in self._jsq.recentProgress()]
|
||||
|
||||
@property
|
||||
@since(2.1)
|
||||
|
|
|
@ -1116,11 +1116,11 @@ class SQLTests(ReusedPySparkTestCase):
|
|||
try:
|
||||
q.processAllAvailable()
|
||||
lastProgress = q.lastProgress
|
||||
recentProgresses = q.recentProgresses
|
||||
recentProgress = q.recentProgress
|
||||
status = q.status
|
||||
self.assertEqual(lastProgress['name'], q.name)
|
||||
self.assertEqual(lastProgress['id'], q.id)
|
||||
self.assertTrue(any(p == lastProgress for p in recentProgresses))
|
||||
self.assertTrue(any(p == lastProgress for p in recentProgress))
|
||||
self.assertTrue(
|
||||
"message" in status and
|
||||
"isDataAvailable" in status and
|
||||
|
|
|
@ -94,7 +94,7 @@ trait ProgressReporter extends Logging {
|
|||
def status: StreamingQueryStatus = currentStatus
|
||||
|
||||
/** Returns an array containing the most recent query progress updates. */
|
||||
def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized {
|
||||
def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
|
||||
progressBuffer.toArray
|
||||
}
|
||||
|
||||
|
|
|
@ -617,7 +617,7 @@ object SQLConf {
|
|||
.createWithDefault(false)
|
||||
|
||||
val STREAMING_PROGRESS_RETENTION =
|
||||
SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
|
||||
SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates")
|
||||
.doc("The number of progress updates to retain for a streaming query")
|
||||
.intConf
|
||||
.createWithDefault(100)
|
||||
|
|
|
@ -87,11 +87,11 @@ trait StreamingQuery {
|
|||
/**
|
||||
* Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
|
||||
* The number of progress updates retained for each stream is configured by Spark session
|
||||
* configuration `spark.sql.streaming.numRecentProgresses`.
|
||||
* configuration `spark.sql.streaming.numRecentProgressUpdates`.
|
||||
*
|
||||
* @since 2.1.0
|
||||
*/
|
||||
def recentProgresses: Array[StreamingQueryProgress]
|
||||
def recentProgress: Array[StreamingQueryProgress]
|
||||
|
||||
/**
|
||||
* Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
|
||||
|
|
|
@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
|
|||
try {
|
||||
inputData.addData(10, 11, 12)
|
||||
query.processAllAvailable()
|
||||
val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption
|
||||
val recentProgress = query.recentProgress.filter(_.numInputRows != 0).headOption
|
||||
assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3,
|
||||
s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics")
|
||||
s"recentProgress[${query.recentProgress.toList}] doesn't contain correct metrics")
|
||||
} finally {
|
||||
query.stop()
|
||||
}
|
||||
|
|
|
@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
|
|||
AddTextFileData("100", src, tmp),
|
||||
CheckAnswer("100"),
|
||||
AssertOnQuery { query =>
|
||||
val actualProgress = query.recentProgresses
|
||||
val actualProgress = query.recentProgress
|
||||
.find(_.numInputRows > 0)
|
||||
.getOrElse(sys.error("Could not find records with data."))
|
||||
assert(actualProgress.numInputRows === 1)
|
||||
|
|
|
@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
|
|||
}
|
||||
true
|
||||
}
|
||||
// `recentProgresses` should not receive too many no data events
|
||||
// `recentProgress` should not receive too many no data events
|
||||
actions += AssertOnQuery { q =>
|
||||
q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
|
||||
q.recentProgress.size > 1 && q.recentProgress.size <= 11
|
||||
}
|
||||
testStream(input.toDS)(actions: _*)
|
||||
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
|
||||
|
|
|
@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
)
|
||||
}
|
||||
|
||||
testQuietly("status, lastProgress, and recentProgresses") {
|
||||
testQuietly("status, lastProgress, and recentProgress") {
|
||||
import StreamingQuerySuite._
|
||||
clock = new StreamManualClock
|
||||
|
||||
|
@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
AssertOnQuery(_.status.isDataAvailable === false),
|
||||
AssertOnQuery(_.status.isTriggerActive === false),
|
||||
AssertOnQuery(_.status.message === "Waiting for next trigger"),
|
||||
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
|
||||
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
|
||||
|
||||
// Test status and progress while offset is being fetched
|
||||
AddData(inputData, 1, 2),
|
||||
|
@ -210,7 +210,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
AssertOnQuery(_.status.isDataAvailable === false),
|
||||
AssertOnQuery(_.status.isTriggerActive === true),
|
||||
AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
|
||||
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
|
||||
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
|
||||
|
||||
// Test status and progress while batch is being fetched
|
||||
AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
|
||||
|
@ -218,14 +218,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
AssertOnQuery(_.status.isDataAvailable === true),
|
||||
AssertOnQuery(_.status.isTriggerActive === true),
|
||||
AssertOnQuery(_.status.message === "Processing new data"),
|
||||
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
|
||||
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
|
||||
|
||||
// Test status and progress while batch is being processed
|
||||
AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
|
||||
AssertOnQuery(_.status.isDataAvailable === true),
|
||||
AssertOnQuery(_.status.isTriggerActive === true),
|
||||
AssertOnQuery(_.status.message === "Processing new data"),
|
||||
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
|
||||
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
|
||||
|
||||
// Test status and progress while batch processing has completed
|
||||
AdvanceManualClock(500), // time = 1100 to unblock job
|
||||
|
@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
AssertOnQuery(_.status.message === "Waiting for next trigger"),
|
||||
AssertOnQuery { query =>
|
||||
assert(query.lastProgress != null)
|
||||
assert(query.recentProgresses.exists(_.numInputRows > 0))
|
||||
assert(query.recentProgresses.last.eq(query.lastProgress))
|
||||
assert(query.recentProgress.exists(_.numInputRows > 0))
|
||||
assert(query.recentProgress.last.eq(query.lastProgress))
|
||||
|
||||
val progress = query.lastProgress
|
||||
assert(progress.id === query.id)
|
||||
|
@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
AssertOnQuery(_.status.isTriggerActive === false),
|
||||
AssertOnQuery(_.status.message === "Waiting for next trigger"),
|
||||
AssertOnQuery { query =>
|
||||
assert(query.recentProgresses.last.eq(query.lastProgress))
|
||||
assert(query.recentProgress.last.eq(query.lastProgress))
|
||||
assert(query.lastProgress.batchId === 1)
|
||||
assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
|
||||
true
|
||||
|
@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
|
|||
try {
|
||||
val q = streamingDF.writeStream.format("memory").queryName("test").start()
|
||||
q.processAllAvailable()
|
||||
q.recentProgresses.head
|
||||
q.recentProgress.head
|
||||
} finally {
|
||||
spark.streams.active.map(_.stop())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue