Renamed BlockedLocalFileShuffle to HttpBlockedLocalFileShuffle for merging with the mos-shuffle branch.

This commit is contained in:
Mosharaf Chowdhury 2010-12-19 14:02:19 -08:00
parent 62d61ed928
commit a83a722256
2 changed files with 13 additions and 14 deletions

View file

@ -1 +1 @@
-Dspark.shuffle.class=spark.BlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.shuffle.class=spark.HttpBlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50

View file

@ -22,7 +22,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
* TODO: Add support for compression when spark.compress is set to true. * TODO: Add support for compression when spark.compress is set to true.
*/ */
@serializable @serializable
class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { class HttpBlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
@transient var totalSplits = 0 @transient var totalSplits = 0
@transient var hasSplits = 0 @transient var hasSplits = 0
@ -43,7 +43,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
: RDD[(K, C)] = : RDD[(K, C)] =
{ {
val sc = input.sparkContext val sc = input.sparkContext
val shuffleId = BlockedLocalFileShuffle.newShuffleId() val shuffleId = HttpBlockedLocalFileShuffle.newShuffleId()
logInfo("Shuffle ID: " + shuffleId) logInfo("Shuffle ID: " + shuffleId)
val splitRdd = new NumberedSplitRDD(input) val splitRdd = new NumberedSplitRDD(input)
@ -70,14 +70,14 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
for (i <- 0 until numOutputSplits) { for (i <- 0 until numOutputSplits) {
// Open the INDEX file // Open the INDEX file
var indexFile: File = var indexFile: File =
BlockedLocalFileShuffle.getBlockIndexOutputFile(shuffleId, myIndex, i) HttpBlockedLocalFileShuffle.getBlockIndexOutputFile(shuffleId, myIndex, i)
var indexOut = new ObjectOutputStream(new FileOutputStream(indexFile)) var indexOut = new ObjectOutputStream(new FileOutputStream(indexFile))
var indexDirty: Boolean = true var indexDirty: Boolean = true
var alreadyWritten: Long = 0 var alreadyWritten: Long = 0
// Open the actual file // Open the actual file
var file: File = var file: File =
BlockedLocalFileShuffle.getOutputFile(shuffleId, myIndex, i) HttpBlockedLocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val out = new ObjectOutputStream(new FileOutputStream(file)) val out = new ObjectOutputStream(new FileOutputStream(file))
val writeStartTime = System.currentTimeMillis val writeStartTime = System.currentTimeMillis
@ -89,7 +89,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
indexDirty = true indexDirty = true
// Update the INDEX file if more than blockSize limit has been written // Update the INDEX file if more than blockSize limit has been written
if (file.length - alreadyWritten > BlockedLocalFileShuffle.BlockSize) { if (file.length - alreadyWritten > HttpBlockedLocalFileShuffle.BlockSize) {
indexOut.writeObject(file.length) indexOut.writeObject(file.length)
indexDirty = false indexDirty = false
alreadyWritten = file.length alreadyWritten = file.length
@ -109,7 +109,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.")
} }
(myIndex, BlockedLocalFileShuffle.serverUri) (myIndex, HttpBlockedLocalFileShuffle.serverUri)
}).collect() }).collect()
// TODO: Could broadcast outputLocs // TODO: Could broadcast outputLocs
@ -129,12 +129,12 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C] combiners = new HashMap[K, C]
var threadPool = BlockedLocalFileShuffle.newDaemonFixedThreadPool( var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool(
BlockedLocalFileShuffle.MaxConnections) HttpBlockedLocalFileShuffle.MaxConnections)
while (hasSplits < totalSplits) { while (hasSplits < totalSplits) {
var numThreadsToCreate = var numThreadsToCreate =
Math.min(totalSplits, BlockedLocalFileShuffle.MaxConnections) - Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxConnections) -
threadPool.getActiveCount threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) { while (hasSplits < totalSplits && numThreadsToCreate > 0) {
@ -157,7 +157,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
} }
// Sleep for a while before creating new threads // Sleep for a while before creating new threads
Thread.sleep(BlockedLocalFileShuffle.MinKnockInterval) Thread.sleep(HttpBlockedLocalFileShuffle.MinKnockInterval)
} }
combiners combiners
}) })
@ -175,7 +175,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
} }
if (requiredSplits.size > 0) { if (requiredSplits.size > 0) {
requiredSplits(BlockedLocalFileShuffle.ranGen.nextInt( requiredSplits(HttpBlockedLocalFileShuffle.ranGen.nextInt(
requiredSplits.size)) requiredSplits.size))
} else { } else {
-1 -1
@ -292,8 +292,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
} }
} }
object HttpBlockedLocalFileShuffle extends Logging {
object BlockedLocalFileShuffle extends Logging {
// Used thoughout the code for small and large waits/timeouts // Used thoughout the code for small and large waits/timeouts
private var BlockSize_ = 1024 * 1024 private var BlockSize_ = 1024 * 1024