Merge branch 'streaming' into streaming-java-api
This commit is contained in:
commit
ee0314c3b3
|
@ -80,12 +80,12 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
val serializer = SparkEnv.get.serializer.newInstance()
|
||||
val serializeStream = serializer.serializeStream(fileOutputStream)
|
||||
serializeStream.writeAll(iterator)
|
||||
fileOutputStream.close()
|
||||
serializeStream.close()
|
||||
|
||||
if (!fs.rename(tempOutputPath, finalOutputPath)) {
|
||||
if (!fs.delete(finalOutputPath, true)) {
|
||||
throw new IOException("Checkpoint failed: failed to delete earlier output of task "
|
||||
+ context.attemptId);
|
||||
+ context.attemptId)
|
||||
}
|
||||
if (!fs.rename(tempOutputPath, finalOutputPath)) {
|
||||
throw new IOException("Checkpoint failed: failed to save output of task: "
|
||||
|
@ -119,7 +119,7 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
|
||||
val path = new Path(hdfsPath, "temp")
|
||||
val fs = path.getFileSystem(new Configuration())
|
||||
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 10) _)
|
||||
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
|
||||
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
|
||||
assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same")
|
||||
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same")
|
||||
|
|
|
@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase {
|
|||
)
|
||||
|
||||
val updateStateOperation = (s: DStream[String]) => {
|
||||
val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
|
||||
Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
|
||||
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
|
||||
Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
|
||||
}
|
||||
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
|
||||
s.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
|
||||
}
|
||||
|
||||
testOperation(inputData, updateStateOperation, outputData, true)
|
||||
|
|
Loading…
Reference in a new issue