adding better error handling when indexing an RDD

This commit is contained in:
Joseph E. Gonzalez 2013-08-15 14:29:48 -07:00
parent 61281756f2
commit 3bb6e019d4

View file

@ -350,15 +350,12 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
// Get the corresponding indicies and values for this IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
assert(thisIndex.size == thisValues.size)
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[JHashMap[K, Int]]
assert(thisIndex.size == newIndex.size)
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[(Seq[V], ArrayBuffer[W])](thisValues.size)(null)
// populate the newValues with the values in this IndexedRDD
for ((k,i) <- thisIndex) {
assert(i < thisValues.size)
if (thisValues(i) != null) {
newValues(i) = (thisValues(i), ArrayBuffer.empty[W])
}
@ -619,7 +616,10 @@ object IndexedRDD {
assert(!indexIter.hasNext())
val values = new Array[Seq[V]](index.size)
for ((k,v) <- tblIter) {
assert(index.contains(k))
if (!index.contains(k)) {
throw new SparkException("Error: Try to bind an external index " +
"to an RDD which contains keys that are not in the index.")
}
val ind = index(k)
if (values(ind) == null) {
values(ind) = ArrayBuffer.empty[V]