Fixed bug in CheckpointSuite

This commit is contained in:
Tathagata Das 2013-02-20 10:26:36 -08:00
parent 1cb725e417
commit 334ab92441
2 changed files with 6 additions and 6 deletions

View file

@ -22,10 +22,10 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
override def getPartitions: Array[Partition] = {
val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numPartitions = splitFiles.size
if (numPartitions > 0 && !splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
!splitFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1))) {
val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numPartitions = partitionFiles.size
if (numPartitions > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
}
Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))

View file

@ -164,12 +164,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("CheckpointRDD with zero partitions") {
val rdd = new BlockRDD[Int](sc, Array[String]())
assert(rdd.splits.size === 0)
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
rdd.checkpoint()
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
assert(rdd.splits.size === 0)
assert(rdd.partitions.size === 0)
}
/**