[MINOR] Remove inappropriate type notation and extra anonymous closure within functional transformations

## What changes were proposed in this pull request?

This PR removes

- Inappropriate type notations
    For example, from
    ```scala
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
    ...
    ```
    to
    ```scala
    words.foreachRDD { (rdd, time) =>
    ...
    ```

- Extra anonymous closure within functional transformations.
    For example,
    ```scala
    .map(item => {
      ...
    })
    ```

    which can be just simply as below:

    ```scala
    .map { item =>
      ...
    }
    ```

and corrects some obvious style nits.

## How was this patch tested?

This was tested after adding rules in `scalastyle-config.xml`, which ended up with not finding all perfectly.

The rules applied were below:

- For the first correction,

```xml
<check customId="NoExtraClosure" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
    <parameters><parameter name="regex">(?m)\.[a-zA-Z_][a-zA-Z0-9]*\(\s*[^,]+s*=>\s*\{[^\}]+\}\s*\)</parameter></parameters>
</check>
```

```xml
<check customId="NoExtraClosure" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
    <parameters><parameter name="regex">\.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]([^\n>,]+=>)?\s*\{([^()]|(?R))*\}^[,]</parameter></parameters>
</check>
```

- For the second correction
```xml
<check customId="TypeNotation" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
    <parameters><parameter name="regex">\.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]\s*\([^):]*:R))*\}^[,]</parameter></parameters>
</check>
```

**Those rules were not added**

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #12413 from HyukjinKwon/SPARK-style.
This commit is contained in:
hyukjinkwon 2016-04-16 14:56:23 +01:00 committed by Sean Owen
parent 527c780bb0
commit 9f678e9754
19 changed files with 54 additions and 64 deletions

View file

@ -149,9 +149,7 @@ case class ExceptionFailure(
this(e, accumUpdates, preserveCause = true) this(e, accumUpdates, preserveCause = true)
} }
def exception: Option[Throwable] = exceptionWrapper.flatMap { def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))
(w: ThrowableSerializationWrapper) => Option(w.exception)
}
override def toErrorString: String = override def toErrorString: String =
if (fullStackTrace == null) { if (fullStackTrace == null) {

View file

@ -105,7 +105,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD. * Return a new RDD by applying a function to all elements of this RDD.
*/ */
def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = { def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue())) new JavaDoubleRDD(rdd.map(f.call(_).doubleValue()))
} }
/** /**
@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/ */
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue())) new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue()))
} }
/** /**
@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def fn: (Iterator[T]) => Iterator[jl.Double] = { def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala (x: Iterator[T]) => f.call(x.asJava).asScala
} }
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue())) new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue()))
} }
/** /**
@ -196,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
(x: Iterator[T]) => f.call(x.asJava).asScala (x: Iterator[T]) => f.call(x.asJava).asScala
} }
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue())) .map(_.doubleValue()))
} }
/** /**
@ -215,7 +215,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to each partition of this RDD. * Applies a function f to each partition of this RDD.
*/ */
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
rdd.foreachPartition((x => f.call(x.asJava))) rdd.foreachPartition(x => f.call(x.asJava))
} }
/** /**

View file

@ -35,9 +35,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
def render(request: HttpServletRequest): Seq[Node] = { def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId") val appId = request.getParameter("appId")
val state = master.askWithRetry[MasterStateResponse](RequestMasterState) val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
val app = state.activeApps.find(_.id == appId).getOrElse({ val app = state.activeApps.find(_.id == appId)
state.completedApps.find(_.id == appId).getOrElse(null) .getOrElse(state.completedApps.find(_.id == appId).orNull)
})
if (app == null) { if (app == null) {
val msg = <div class="row-fluid">No running application with ID {appId}</div> val msg = <div class="row-fluid">No running application with ID {appId}</div>
return UIUtils.basicSparkPage(msg, "Not Found") return UIUtils.basicSparkPage(msg, "Not Found")

View file

@ -68,7 +68,10 @@ private[deploy] class DriverRunner(
private var clock: Clock = new SystemClock() private var clock: Clock = new SystemClock()
private var sleeper = new Sleeper { private var sleeper = new Sleeper {
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed}) def sleep(seconds: Int): Unit = (0 until seconds).takeWhile { _ =>
Thread.sleep(1000)
!killed
}
} }
/** Starts a thread to run and manage the driver. */ /** Starts a thread to run and manage the driver. */
@ -116,7 +119,7 @@ private[deploy] class DriverRunner(
/** Terminate this driver (or prevent it from ever starting if not yet started) */ /** Terminate this driver (or prevent it from ever starting if not yet started) */
private[worker] def kill() { private[worker] def kill() {
synchronized { synchronized {
process.foreach(p => p.destroy()) process.foreach(_.destroy())
killed = true killed = true
} }
} }

View file

@ -190,11 +190,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
// initializes/resets to start iterating from the beginning // initializes/resets to start iterating from the beginning
def resetIterator(): Iterator[(String, Partition)] = { def resetIterator(): Iterator[(String, Partition)] = {
val iterators = (0 to 2).map( x => val iterators = (0 to 2).map { x =>
prev.partitions.iterator.flatMap(p => { prev.partitions.iterator.flatMap { p =>
if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
} ) }
) }
iterators.reduceLeft((x, y) => x ++ y) iterators.reduceLeft((x, y) => x ++ y)
} }

View file

@ -65,11 +65,11 @@ class JdbcRDD[T: ClassTag](
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end // bounds are inclusive, hence the + 1 here and - 1 on end
val length = BigInt(1) + upperBound - lowerBound val length = BigInt(1) + upperBound - lowerBound
(0 until numPartitions).map(i => { (0 until numPartitions).map { i =>
val start = lowerBound + ((i * length) / numPartitions) val start = lowerBound + ((i * length) / numPartitions)
val end = lowerBound + (((i + 1) * length) / numPartitions) - 1 val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
new JdbcPartition(i, start.toLong, end.toLong) new JdbcPartition(i, start.toLong, end.toLong)
}).toArray }.toArray
} }
override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T] override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]

View file

@ -129,7 +129,7 @@ private object ParallelCollectionRDD {
} }
seq match { seq match {
case r: Range => case r: Range =>
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) => positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice // If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) { if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step) new Range.Inclusive(r.start + start * r.step, r.end, r.step)
@ -137,7 +137,7 @@ private object ParallelCollectionRDD {
else { else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step) new Range(r.start + start * r.step, r.start + end * r.step, r.step)
} }
}).toSeq.asInstanceOf[Seq[Seq[T]]] }.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] => case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc // For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices) val slices = new ArrayBuffer[Seq[T]](numSlices)
@ -150,10 +150,9 @@ private object ParallelCollectionRDD {
slices slices
case _ => case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({ positions(array.length, numSlices).map { case (start, end) =>
case (start, end) =>
array.slice(start, end).toSeq array.slice(start, end).toSeq
}).toSeq }.toSeq
} }
} }
} }

View file

@ -59,10 +59,10 @@ object StreamingTestExample {
val conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample") val conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample")
val ssc = new StreamingContext(conf, batchDuration) val ssc = new StreamingContext(conf, batchDuration)
ssc.checkpoint({ ssc.checkpoint {
val dir = Utils.createTempDir() val dir = Utils.createTempDir()
dir.toString dir.toString
}) }
// $example on$ // $example on$
val data = ssc.textFileStream(dataDir).map(line => line.split(",") match { val data = ssc.textFileStream(dataDir).map(line => line.split(",") match {

View file

@ -115,8 +115,8 @@ object RecoverableNetworkWordCount {
// words in input stream of \n delimited text (eg. generated by 'nc') // words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port) val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" ")) val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => wordCounts.foreachRDD { (rdd, time) =>
// Get or register the blacklist Broadcast // Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext) val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator // Get or register the droppedWordsCounter Accumulator
@ -158,9 +158,7 @@ object RecoverableNetworkWordCount {
} }
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory, val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => { () => createContext(ip, port, outputPath, checkpointDirectory))
createContext(ip, port, outputPath, checkpointDirectory)
})
ssc.start() ssc.start()
ssc.awaitTermination() ssc.awaitTermination()
} }

View file

@ -59,7 +59,7 @@ object SqlNetworkWordCount {
val words = lines.flatMap(_.split(" ")) val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query // Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) => words.foreachRDD { (rdd, time) =>
// Get the singleton instance of SQLContext // Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._ import sqlContext.implicits._

View file

@ -56,7 +56,9 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) {
} }
def getVectors: JMap[String, JList[Float]] = { def getVectors: JMap[String, JList[Float]] = {
model.getVectors.map({case (k, v) => (k, v.toList.asJava)}).asJava model.getVectors.map { case (k, v) =>
(k, v.toList.asJava)
}.asJava
} }
def save(sc: SparkContext, path: String): Unit = model.save(sc, path) def save(sc: SparkContext, path: String): Unit = model.save(sc, path)

View file

@ -189,8 +189,7 @@ class BinaryClassificationMetrics @Since("1.3.0") (
Iterator(agg) Iterator(agg)
}.collect() }.collect()
val partitionwiseCumulativeCounts = val partitionwiseCumulativeCounts =
agg.scanLeft(new BinaryLabelCounter())( agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
(agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last val totalCount = partitionwiseCumulativeCounts.last
logInfo(s"Total counts: $totalCount") logInfo(s"Total counts: $totalCount")
val cumulativeCounts = binnedCounts.mapPartitionsWithIndex( val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(

View file

@ -172,8 +172,8 @@ class Analyzer(
private def assignAliases(exprs: Seq[NamedExpression]) = { private def assignAliases(exprs: Seq[NamedExpression]) = {
exprs.zipWithIndex.map { exprs.zipWithIndex.map {
case (expr, i) => case (expr, i) =>
expr transformUp { expr.transformUp { case u @ UnresolvedAlias(child, optionalAliasName) =>
case u @ UnresolvedAlias(child, optionalAliasName) => child match { child match {
case ne: NamedExpression => ne case ne: NamedExpression => ne
case e if !e.resolved => u case e if !e.resolved => u
case g: Generator => MultiAlias(g, Nil) case g: Generator => MultiAlias(g, Nil)
@ -215,7 +215,7 @@ class Analyzer(
* represented as the bit masks. * represented as the bit masks.
*/ */
def bitmasks(r: Rollup): Seq[Int] = { def bitmasks(r: Rollup): Seq[Int] = {
Seq.tabulate(r.groupByExprs.length + 1)(idx => {(1 << idx) - 1}) Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1)
} }
/* /*

View file

@ -168,9 +168,7 @@ object FromUnsafeProjection {
* Returns an UnsafeProjection for given Array of DataTypes. * Returns an UnsafeProjection for given Array of DataTypes.
*/ */
def apply(fields: Seq[DataType]): Projection = { def apply(fields: Seq[DataType]): Projection = {
create(fields.zipWithIndex.map(x => { create(fields.zipWithIndex.map(x => new BoundReference(x._2, x._1, true)))
new BoundReference(x._2, x._1, true)
}))
} }
/** /**

View file

@ -314,11 +314,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
assert(children.nonEmpty) assert(children.nonEmpty)
val (deterministic, nondeterministic) = partitionByDeterministic(condition) val (deterministic, nondeterministic) = partitionByDeterministic(condition)
val newFirstChild = Filter(deterministic, children.head) val newFirstChild = Filter(deterministic, children.head)
val newOtherChildren = children.tail.map { val newOtherChildren = children.tail.map { child =>
child => { val rewrites = buildRewrites(children.head, child)
val rewrites = buildRewrites(children.head, child) Filter(pushToRight(deterministic, rewrites), child)
Filter(pushToRight(deterministic, rewrites), child)
}
} }
Filter(nondeterministic, Union(newFirstChild +: newOtherChildren)) Filter(nondeterministic, Union(newFirstChild +: newOtherChildren))
@ -360,7 +358,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty => case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
val newOutput = e.output.filter(a.references.contains(_)) val newOutput = e.output.filter(a.references.contains(_))
val newProjects = e.projections.map { proj => val newProjects = e.projections.map { proj =>
proj.zip(e.output).filter { case (e, a) => proj.zip(e.output).filter { case (_, a) =>
newOutput.contains(a) newOutput.contains(a)
}.unzip._1 }.unzip._1
} }

View file

@ -60,9 +60,7 @@ case class ShuffledHashJoin(
val context = TaskContext.get() val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
// This relation is usually used until the end of task. // This relation is usually used until the end of task.
context.addTaskCompletionListener((t: TaskContext) => context.addTaskCompletionListener(_ => relation.close())
relation.close()
)
relation relation
} }

View file

@ -431,7 +431,7 @@ private[sql] object StatFunctions extends Logging {
s"exceed 1e4. Currently $columnSize") s"exceed 1e4. Currently $columnSize")
val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) => val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
val countsRow = new GenericMutableRow(columnSize + 1) val countsRow = new GenericMutableRow(columnSize + 1)
rows.foreach { (row: Row) => rows.foreach { row =>
// row.get(0) is column 1 // row.get(0) is column 1
// row.get(1) is column 2 // row.get(1) is column 2
// row.get(2) is the frequency // row.get(2) is the frequency

View file

@ -82,7 +82,7 @@ private[hive] case class HiveSimpleUDF(
// TODO: Finish input output types. // TODO: Finish input output types.
override def eval(input: InternalRow): Any = { override def eval(input: InternalRow): Any = {
val inputs = wrap(children.map(c => c.eval(input)), arguments, cached, inputDataTypes) val inputs = wrap(children.map(_.eval(input)), arguments, cached, inputDataTypes)
val ret = FunctionRegistry.invoke( val ret = FunctionRegistry.invoke(
method, method,
function, function,
@ -152,10 +152,8 @@ private[hive] case class HiveGenericUDF(
var i = 0 var i = 0
while (i < children.length) { while (i < children.length) {
val idx = i val idx = i
deferredObjects(i).asInstanceOf[DeferredObjectAdapter].set( deferredObjects(i).asInstanceOf[DeferredObjectAdapter]
() => { .set(() => children(idx).eval(input))
children(idx).eval(input)
})
i += 1 i += 1
} }
unwrap(function.evaluate(deferredObjects), returnInspector) unwrap(function.evaluate(deferredObjects), returnInspector)

View file

@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] (
* of this DStream. * of this DStream.
*/ */
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope { def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
} }
/** /**
@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] (
*/ */
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null) def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope { : DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
} }
/** /**
@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] (
*/ */
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false) val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true) foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
} }
/** /**
@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check // DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean // it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false) val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], t: Time) => cleanedF(r)) transform((r: RDD[T], _: Time) => cleanedF(r))
} }
/** /**
@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] (
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration slideDuration: Duration
): DStream[T] = ssc.withScope { ): DStream[T] = ssc.withScope {
this.map(x => (1, x)) this.map((1, _))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2) .map(_._2)
} }
@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] (
numPartitions: Int = ssc.sc.defaultParallelism) numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null) (implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope { : DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKeyAndWindow( this.map((_, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y, (x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y, (x: Long, y: Long) => x - y,
windowDuration, windowDuration,
@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] (
logInfo(s"Slicing from $fromTime to $toTime" + logInfo(s"Slicing from $fromTime to $toTime" +
s" (aligned to $alignedFromTime and $alignedToTime)") s" (aligned to $alignedFromTime and $alignedToTime)")
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
if (time >= zeroTime) getOrCompute(time) else None if (time >= zeroTime) getOrCompute(time) else None
}) }
} }
/** /**