[SPARK-14556][SQL] Code clean-ups for package o.a.s.sql.execution.streaming.state

## What changes were proposed in this pull request?

- `StateStoreConf.**max**DeltasForSnapshot` was renamed to `StateStoreConf.**min**DeltasForSnapshot`
- some state switch checks were added
- improved consistency between method names and string literals
- other comments & typo fix

## How was this patch tested?

N/A

Author: Liwei Lin <lwlin7@gmail.com>

Closes #12323 from lw-lin/streaming-state-clean-up.
This commit is contained in:
Liwei Lin 2016-04-12 11:50:51 -07:00 committed by Shixiong Zhu
parent 111a62474a
commit 852bbc6c00
5 changed files with 31 additions and 32 deletions

View file

@ -46,12 +46,14 @@ import org.apache.spark.util.Utils
* Usage:
* To update the data in the state store, the following order of operations are needed.
*
* - val store = StateStore.get(operatorId, partitionId, version) // to get the right store
* - store.update(...)
* // get the right store
* - val store = StateStore.get(
* StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...)
* - store.put(...)
* - store.remove(...)
* - store.commit() // commits all the updates to made with version number
* - store.commit() // commits all the updates to made; the new version will be returned
* - store.iterator() // key-value data after last commit as an iterator
* - store.updates() // updates made in the last as an iterator
* - store.updates() // updates made in the last commit as an iterator
*
* Fault-tolerance model:
* - Every set of updates is written to a delta file before committing.
@ -99,7 +101,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
verify(state == UPDATING, "Cannot remove after already committed or cancelled")
verify(state == UPDATING, "Cannot remove after already committed or aborted")
val isNewKey = !mapToUpdate.containsKey(key)
mapToUpdate.put(key, value)
@ -109,7 +111,7 @@ private[state] class HDFSBackedStateStoreProvider(
// Value did not exist in previous version and was added already, keep it marked as added
allUpdates.put(key, ValueAdded(key, value))
case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) =>
// Value existed in prev version and updated/removed, mark it as updated
// Value existed in previous version and updated/removed, mark it as updated
allUpdates.put(key, ValueUpdated(key, value))
case None =>
// There was no prior update, so mark this as added or updated according to its presence
@ -122,7 +124,7 @@ private[state] class HDFSBackedStateStoreProvider(
/** Remove keys that match the following condition */
override def remove(condition: UnsafeRow => Boolean): Unit = {
verify(state == UPDATING, "Cannot remove after already committed or cancelled")
verify(state == UPDATING, "Cannot remove after already committed or aborted")
val keyIter = mapToUpdate.keySet().iterator()
while (keyIter.hasNext) {
val key = keyIter.next
@ -146,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider(
/** Commit all the updates that have been made to the store, and return the new version. */
override def commit(): Long = {
verify(state == UPDATING, "Cannot commit after already committed or cancelled")
verify(state == UPDATING, "Cannot commit after already committed or aborted")
try {
finalizeDeltaFile(tempDeltaFileStream)
@ -161,8 +163,10 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
/** Cancel all the updates made on this store. This store will not be usable any more. */
/** Abort all the updates made on this store. This store will not be usable any more. */
override def abort(): Unit = {
verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
@ -170,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
fs.delete(tempDeltaFile, true)
}
logInfo("Canceled ")
logInfo("Aborted")
}
/**
@ -178,7 +182,8 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread.
*/
override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = {
verify(state == COMMITTED, "Cannot get iterator of store data before committing")
verify(state == COMMITTED,
"Cannot get iterator of store data before committing or after aborting")
HDFSBackedStateStoreProvider.this.iterator(newVersion)
}
@ -187,7 +192,8 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread.
*/
override def updates(): Iterator[StoreUpdate] = {
verify(state == COMMITTED, "Cannot get iterator of updates before committing")
verify(state == COMMITTED,
"Cannot get iterator of updates before committing or after aborting")
allUpdates.values().asScala.toIterator
}
@ -223,7 +229,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
override def toString(): String = {
s"StateStore[id = (op=${id.operatorId},part=${id.partitionId}), dir = $baseDir]"
s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
}
/* Internal classes and methods */
@ -277,7 +283,7 @@ private[state] class HDFSBackedStateStoreProvider(
} else {
if (!fs.isDirectory(baseDir)) {
throw new IllegalStateException(
s"Cannot use ${id.checkpointLocation} for storing state data for $this as" +
s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
s"$baseDir already exists and is not a directory")
}
}
@ -453,11 +459,11 @@ private[state] class HDFSBackedStateStoreProvider(
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
synchronized { loadedMaps.get(lastVersion) } match {
case Some(map) =>
if (deltaFilesForLastVersion.size > storeConf.maxDeltasForSnapshot) {
if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
writeSnapshotFile(lastVersion, map)
}
case None =>
// The last map is not loaded, probably some other instance is incharge
// The last map is not loaded, probably some other instance is in charge
}
}
@ -506,7 +512,6 @@ private[state] class HDFSBackedStateStoreProvider(
.lastOption
val deltaBatchFiles = latestSnapshotFileBeforeVersion match {
case Some(snapshotFile) =>
val deltaBatchIds = (snapshotFile.version + 1) to version
val deltaFiles = allFiles.filter { file =>
file.version > snapshotFile.version && file.version <= version
@ -579,4 +584,3 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
}

View file

@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.streaming.state
import java.util.Timer
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
@ -63,7 +62,7 @@ trait StateStore {
*/
def commit(): Long
/** Cancel all the updates that have been made to the store. */
/** Abort all the updates that have been made to the store. */
def abort(): Unit
/**
@ -109,8 +108,8 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
/**
* Companion object to [[StateStore]] that provides helper methods to create and retrieve stores
* by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null),
* it also runs a periodic background tasks to do maintenance on the loaded stores. For each
* store, tt uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
* it also runs a periodic background task to do maintenance on the loaded stores. For each
* store, it uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
* the store is the active instance. Accordingly, it either keeps it loaded and performs
* maintenance, or unloads the store.
*/

View file

@ -26,7 +26,7 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
import SQLConf._
val maxDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
}
@ -34,4 +34,3 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
private[streaming] object StateStoreConf {
val empty = new StateStoreConf()
}

View file

@ -50,8 +50,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
private val endpointName = "StateStoreCoordinator"
/**
* Create a reference to a [[StateStoreCoordinator]], This can be called from driver as well as
* executors.
* Create a reference to a [[StateStoreCoordinator]]
*/
def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
try {
@ -75,7 +74,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
}
/**
* Reference to a [[StateStoreCoordinator]] that can be used to coordinator instances of
* Reference to a [[StateStoreCoordinator]] that can be used to coordinate instances of
* [[StateStore]]s across all the executors, and get their locations for job scheduling.
*/
private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
@ -142,5 +141,3 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS
context.reply(true)
}
}

View file

@ -22,12 +22,12 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.SerializableConfiguration
/**
* An RDD that allows computations to be executed against [[StateStore]]s. It
* uses the [[StateStoreCoordinator]] to use the locations of loaded state stores as
* preferred locations.
* uses the [[StateStoreCoordinator]] to get the locations of loaded state stores
* and use that as the preferred locations.
*/
class StateStoreRDD[T: ClassTag, U: ClassTag](
dataRDD: RDD[T],