From 06694f1c68cb752ea311144f0dbe50e92e1393cf Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Sat, 2 Apr 2016 08:12:04 -0700 Subject: [PATCH] [MINOR] Typo fixes ## What changes were proposed in this pull request? Typo fixes. No functional changes. ## How was this patch tested? Built the sources and ran with samples. Author: Jacek Laskowski Closes #11802 from jaceklaskowski/typo-fixes. --- .../RecoverableNetworkWordCount.scala | 2 +- .../scala/org/apache/spark/ml/Pipeline.scala | 2 +- .../ml/regression/LinearRegression.scala | 2 +- .../catalyst/plans/logical/LogicalPlan.scala | 4 ++-- .../spark/sql/ExperimentalMethods.scala | 2 +- .../execution/joins/BroadcastHashJoin.scala | 2 +- .../org/apache/spark/sql/functions.scala | 12 +++++----- .../spark/streaming/StreamingContext.scala | 13 ++++++----- .../dstream/ConstantInputDStream.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 8 +++---- .../dstream/DStreamCheckpointData.scala | 6 ++--- .../streaming/dstream/InputDStream.scala | 6 ++--- .../dstream/ReducedWindowedDStream.scala | 2 +- .../streaming/dstream/StateDStream.scala | 12 +++++----- .../scheduler/ReceivedBlockTracker.scala | 4 ++-- .../scheduler/rate/RateEstimator.scala | 22 ++++++++++--------- 16 files changed, 52 insertions(+), 49 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 05f8e65d65..b6b8bc33f7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -141,7 +141,7 @@ object RecoverableNetworkWordCount { def main(args: Array[String]) { if (args.length != 4) { - System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println("Your arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ |Usage: RecoverableNetworkWordCount diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 3a99979a88..afefaaa883 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -147,7 +147,7 @@ class Pipeline @Since("1.4.0") ( t case _ => throw new IllegalArgumentException( - s"Do not support stage $stage of type ${stage.getClass}") + s"Does not support stage $stage of type ${stage.getClass}") } if (index < indexOfLastEstimator) { curDataset = transformer.transform(curDataset) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index ba5ad4c072..2633c06f40 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -58,7 +58,7 @@ private[regression] trait LinearRegressionParams extends PredictorParams * The specific squared error loss function used is: * L = 1/2n ||A coefficients - y||^2^ * - * This support multiple types of regularization: + * This supports multiple types of regularization: * - none (a.k.a. ordinary least squares) * - L2 (ridge regression) * - L1 (Lasso) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index ecf4285c46..aceeb8aadc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -79,13 +79,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * Computes [[Statistics]] for this plan. The default implementation assumes the output - * cardinality is the product of of all child plan's cardinality, i.e. applies in the case + * cardinality is the product of all child plan's cardinality, i.e. applies in the case * of cartesian joins. * * [[LeafNode]]s must override this. */ def statistics: Statistics = { - if (children.size == 0) { + if (children.isEmpty) { throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") } Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala index d7cd84fd24..c5df028485 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala @@ -37,7 +37,7 @@ class ExperimentalMethods private[sql]() { /** * Allows extra strategies to be injected into the query planner at runtime. Note this API - * should be consider experimental and is not intended to be stable across releases. + * should be considered experimental and is not intended to be stable across releases. * * @since 1.3.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index f5b083c216..0ed1ed41b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.collection.CompactBuffer /** * Performs an inner hash join of two child relations. When the output RDD of this operator is * being constructed, a Spark job is asynchronously started to calculate the values for the - * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * broadcast relation. This data is then placed in a Spark broadcast variable. The streamed * relation is not shuffled. */ case class BroadcastHashJoin( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 74906050ac..baf947d037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2232,7 +2232,7 @@ object functions { /** * Splits str around pattern (pattern is a regular expression). - * NOTE: pattern is a string represent the regular expression. + * NOTE: pattern is a string representation of the regular expression. * * @group string_funcs * @since 1.5.0 @@ -2267,9 +2267,9 @@ object functions { /** * Translate any character in the src by a character in replaceString. - * The characters in replaceString is corresponding to the characters in matchingString. - * The translate will happen when any character in the string matching with the character - * in the matchingString. + * The characters in replaceString correspond to the characters in matchingString. + * The translate will happen when any character in the string matches the character + * in the `matchingString`. * * @group string_funcs * @since 1.5.0 @@ -2692,7 +2692,7 @@ object functions { ////////////////////////////////////////////////////////////////////////////////////////////// /** - * Returns true if the array contain the value + * Returns true if the array contains `value` * @group collection_funcs * @since 1.5.0 */ @@ -2920,7 +2920,7 @@ object functions { /** * Defines a user-defined function (UDF) using a Scala closure. For this variant, the caller must - * specifcy the output data type, and there is no automatic input type coercion. + * specify the output data type, and there is no automatic input type coercion. * * @param f A closure in Scala * @param dataType The output data type of the UDF diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 3a664c4f5c..c1e151d08b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -132,7 +132,7 @@ class StreamingContext private[streaming] ( "both SparkContext and checkpoint as null") } - private[streaming] val isCheckpointPresent = (_cp != null) + private[streaming] val isCheckpointPresent: Boolean = _cp != null private[streaming] val sc: SparkContext = { if (_sc != null) { @@ -213,8 +213,8 @@ class StreamingContext private[streaming] ( def sparkContext: SparkContext = sc /** - * Set each DStreams in this context to remember RDDs it generated in the last given duration. - * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * Set each DStream in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and release them for garbage * collection. This method allows the developer to specify how long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). * @param duration Minimum duration that each DStream should remember its RDDs @@ -282,13 +282,14 @@ class StreamingContext private[streaming] ( } /** - * Create a input stream from TCP source hostname:port. Data is received using + * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @see [[socketStream]] */ def socketTextStream( hostname: String, @@ -299,7 +300,7 @@ class StreamingContext private[streaming] ( } /** - * Create a input stream from TCP source hostname:port. Data is received using + * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data @@ -860,7 +861,7 @@ private class StreamingContextPythonHelper { */ def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = { val checkpointOption = CheckpointReader.read( - checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false) + checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = false) checkpointOption.map(new StreamingContext(null, _, null)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index b5f86fe779..995470ec8d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{StreamingContext, Time} /** - * An input stream that always returns the same RDD on each timestep. Useful for testing. + * An input stream that always returns the same RDD on each time step. Useful for testing. */ class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T]) extends InputDStream[T](_ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index eb7b64eaf4..c40beeff97 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -83,7 +83,7 @@ abstract class DStream[T: ClassTag] ( // RDDs generated, marked as private[streaming] so that testsuites can access it @transient - private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () + private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]() // Time zero for the DStream private[streaming] var zeroTime: Time = null @@ -269,7 +269,7 @@ abstract class DStream[T: ClassTag] ( checkpointDuration == null || rememberDuration > checkpointDuration, s"The remember duration for ${this.getClass.getSimpleName} has been set to " + s" $rememberDuration which is not more than the checkpoint interval" + - s" ($checkpointDuration). Please set it to higher than $checkpointDuration." + s" ($checkpointDuration). Please set it to a value higher than $checkpointDuration." ) dependencies.foreach(_.validateAtStart()) @@ -277,7 +277,7 @@ abstract class DStream[T: ClassTag] ( logInfo(s"Slide time = $slideDuration") logInfo(s"Storage level = ${storageLevel.description}") logInfo(s"Checkpoint interval = $checkpointDuration") - logInfo(s"Remember duration = $rememberDuration") + logInfo(s"Remember interval = $rememberDuration") logInfo(s"Initialized and validated $this") } @@ -535,7 +535,7 @@ abstract class DStream[T: ClassTag] ( private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(s"${this.getClass().getSimpleName}.readObject used") ois.defaultReadObject() - generatedRDDs = new HashMap[Time, RDD[T]] () + generatedRDDs = new HashMap[Time, RDD[T]]() } // ======================================================================= diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 365a6bc417..431c9dbe2c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.Time import org.apache.spark.util.Utils private[streaming] -class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) +class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() @@ -45,7 +45,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) /** * Updates the checkpoint data of the DStream. This gets called every time * the graph checkpoint is initiated. Default implementation records the - * checkpoint files to which the generate RDDs of the DStream has been saved. + * checkpoint files at which the generated RDDs of the DStream have been saved. */ def update(time: Time) { @@ -103,7 +103,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) /** * Restore the checkpoint data. This gets called once when the DStream graph - * (along with its DStreams) are being restored from a graph checkpoint file. + * (along with its output DStreams) is being restored from a graph checkpoint file. * Default implementation restores the RDDs from their checkpoint files. */ def restore() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 0b6b191dbe..dc88349db5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -39,7 +39,7 @@ import org.apache.spark.util.Utils * * @param _ssc Streaming context that will execute this input stream */ -abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext) +abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) { private[streaming] var lastValidTime: Time = null @@ -90,8 +90,8 @@ abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext) } else { // Time is valid, but check it it is more than lastValidTime if (lastValidTime != null && time < lastValidTime) { - logWarning("isTimeValid called with " + time + " where as last valid time is " + - lastValidTime) + logWarning(s"isTimeValid called with $time whereas the last valid time " + + s"is $lastValidTime") } lastValidTime = time true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index a9be2f213f..a9e93838b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -87,7 +87,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( logDebug("Window time = " + windowDuration) logDebug("Slide time = " + slideDuration) - logDebug("ZeroTime = " + zeroTime) + logDebug("Zero time = " + zeroTime) logDebug("Current window = " + currentWindow) logDebug("Previous window = " + previousWindow) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 68eff89030..0379957e58 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -70,7 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => { // If parent RDD exists, then compute as usual - computeUsingPreviousRDD (parentRDD, prevStateRDD) + computeUsingPreviousRDD(parentRDD, prevStateRDD) } case None => { // If parent RDD does not exist @@ -98,15 +98,15 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => { - updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None))) + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None))) } - val groupedRDD = parentRDD.groupByKey (partitioner) - val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning) + val groupedRDD = parentRDD.groupByKey(partitioner) + val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") - Some (sessionRDD) + Some(sessionRDD) } - case Some (initialStateRDD) => { + case Some(initialStateRDD) => { computeUsingPreviousRDD(parentRDD, initialStateRDD) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 9c8e68b03d..5d9a8ac0d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -119,7 +119,7 @@ private[streaming] class ReceivedBlockTracker( timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { - logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") + logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: @@ -129,7 +129,7 @@ private[streaming] class ReceivedBlockTracker( // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. - logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") + logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index d7210f64fc..7b2ef6881d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -21,18 +21,20 @@ import org.apache.spark.SparkConf import org.apache.spark.streaming.Duration /** - * A component that estimates the rate at wich an InputDStream should ingest - * elements, based on updates at every batch completion. + * A component that estimates the rate at which an `InputDStream` should ingest + * records, based on updates at every batch completion. + * + * @see [[org.apache.spark.streaming.scheduler.RateController]] */ private[streaming] trait RateEstimator extends Serializable { /** - * Computes the number of elements the stream attached to this `RateEstimator` + * Computes the number of records the stream attached to this `RateEstimator` * should ingest per second, given an update on the size and completion * times of the latest batch. * - * @param time The timetamp of the current batch interval that just finished - * @param elements The number of elements that were processed in this batch + * @param time The timestamp of the current batch interval that just finished + * @param elements The number of records that were processed in this batch * @param processingDelay The time in ms that took for the job to complete * @param schedulingDelay The time in ms that the job spent in the scheduling queue */ @@ -46,13 +48,13 @@ private[streaming] trait RateEstimator extends Serializable { object RateEstimator { /** - * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`. + * Return a new `RateEstimator` based on the value of + * `spark.streaming.backpressure.rateEstimator`. * - * The only known estimator right now is `pid`. + * The only known and acceptable estimator right now is `pid`. * * @return An instance of RateEstimator - * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any - * known estimators. + * @throws IllegalArgumentException if the configured RateEstimator is not `pid`. */ def create(conf: SparkConf, batchInterval: Duration): RateEstimator = conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { @@ -64,6 +66,6 @@ object RateEstimator { new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate) case estimator => - throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") + throw new IllegalArgumentException(s"Unknown rate estimator: $estimator") } }