TrackedCustomBlockedLocalFileShuffle has also been updated.

This commit is contained in:
Mosharaf Chowdhury 2010-12-30 18:24:12 -08:00
parent a30f03eae6
commit 33d59fb206
3 changed files with 135 additions and 110 deletions

View file

@ -1 +1 @@
-Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle -Dspark.shuffle.masterHostAddress= -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500
-Dspark.shuffle.class=spark.TrackedCustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress= -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500

View file

@ -569,7 +569,7 @@ object CustomBlockedInMemoryShuffle extends Logging {
var requestedSplitBase = "%s/%d/%d/%d".format(
shuffleDir, shuffleId, myIndex, outputId)
logInfo("requestedSplit: " + requestedSplitBase)
logInfo("requestedSplitBase: " + requestedSplitBase)
// Read BLOCKNUM and send back the total number of blocks
val blockNumName = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir,

View file

@ -470,68 +470,73 @@ extends Shuffle[K, V, C] with Logging {
// Turn the timer OFF, if the sender responds before timeout
// Request specific block
// Good to go. First, receive the length of the requested file
var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
logInfo("Received requestedFileLen = " + requestedFileLen)
while (hasBlocksInSplit(splitIndex) < totalBlocksInSplit(splitIndex)) {
// Set receptionSucceeded to false before trying for each block
receptionSucceeded = false
// Create a temp variable to be used in different places
val requestPath = "http://%s:%d/shuffle/%s-%d".format(
serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit,
// Receive the file
if (requestedFileLen != -1) {
val readStartTime = System.currentTimeMillis
logInfo("BEGIN READ: " + requestPath)
// Receive data in an Array[Byte]
var recvByteArray = new Array[Byte](requestedFileLen)
var alreadyRead = 0
var bytesRead = 0
// Request specific block
while (alreadyRead != requestedFileLen) {
bytesRead =, alreadyRead,
requestedFileLen - alreadyRead)
if (bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
// Make it available to the consumer
try {
receivedData.put((splitIndex, recvByteArray))
} catch {
case e: Exception => {
logInfo("Exception during putting data into receivedData")
// TODO: Updating stats before consumption is completed
hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
// Split has been received only if all the blocks have been received
if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
hasSplitsBitVector.synchronized {
hasSplits += 1
// Good to go. First, receive the length of the requested file
var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
logInfo("Received requestedFileLen = " + requestedFileLen)
// We have received splitIndex
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex, false)
// Create a temp variable to be used in different places
val requestPath = "http://%s:%d/shuffle/%s-%d".format(
serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit,
// Receive the file
if (requestedFileLen != -1) {
val readStartTime = System.currentTimeMillis
logInfo("BEGIN READ: " + requestPath)
// Receive data in an Array[Byte]
var recvByteArray = new Array[Byte](requestedFileLen)
var alreadyRead = 0
var bytesRead = 0
while (alreadyRead != requestedFileLen) {
bytesRead =, alreadyRead,
requestedFileLen - alreadyRead)
if (bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
// Make it available to the consumer
try {
receivedData.put((splitIndex, recvByteArray))
} catch {
case e: Exception => {
logInfo("Exception during putting data into receivedData")
// TODO: Updating stats before consumption is completed
hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
// Split has been received only if all the blocks have been received
if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
hasSplitsBitVector.synchronized {
hasSplits += 1
// We have received splitIndex
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex, false)
receptionSucceeded = true
logInfo("END READ: " + requestPath)
val readTime = System.currentTimeMillis - readStartTime
logInfo("Reading " + requestPath + " took " + readTime + " millis.")
} else {
throw new SparkException("ShuffleServer " + serversplitInfo.hostAddress + " does not have " + requestSplit)
receptionSucceeded = true
logInfo("END READ: " + requestPath)
val readTime = System.currentTimeMillis - readStartTime
logInfo("Reading " + requestPath + " took " + readTime + " millis.")
} else {
throw new SparkException("ShuffleServer " + serversplitInfo.hostAddress + " does not have " + requestSplit)
} catch {
// EOFException is expected to happen because sender can break
@ -752,13 +757,13 @@ object TrackedCustomBlockedLocalFileShuffle extends Logging {
override def run: Unit = {
try {
// Receive basic path information
var requestPath = ois.readObject.asInstanceOf[String]
var requestPathBase = ois.readObject.asInstanceOf[String]
logInfo("requestPath: " + requestPath)
logInfo("requestPathBase: " + requestPathBase)
// Read BLOCKNUM file and send back the total number of blocks
val blockNumFilePath = "%s/%s-BLOCKNUM".format(shuffleDir,
val blockNumIn =
new ObjectInputStream(new FileInputStream(blockNumFilePath))
val BLOCKNUM = blockNumIn.readObject.asInstanceOf[Int]
@ -766,60 +771,80 @@ object TrackedCustomBlockedLocalFileShuffle extends Logging {
// Receive specific block request
val blockId = ois.readObject.asInstanceOf[Int]
val startTime = System.currentTimeMillis
var curTime = startTime
var keepSending = true
var numBlocksToSend = Shuffle.MaxChatBlocks
// Ready to send
requestPath = requestPath + "-" + blockId
// Open the file
var requestedFile: File = null
var requestedFileLen = -1
try {
requestedFile = new File(shuffleDir + "/" + requestPath)
requestedFileLen = requestedFile.length.toInt
} catch {
case e: Exception => { }
// Send the length of the requestPath to let the receiver know that
// transfer is about to start
// In the case of receiver timeout and connection close, this will
// throw a Broken pipe
logInfo("requestedFileLen = " + requestedFileLen)
// Read and send the requested file
if (requestedFileLen != -1) {
// Read
var byteArray = new Array[Byte](requestedFileLen)
val bis =
new BufferedInputStream(new FileInputStream(requestedFile))
var bytesRead =, 0, byteArray.length)
var alreadyRead = bytesRead
while (alreadyRead < requestedFileLen) {
bytesRead =, alreadyRead,
(byteArray.length - alreadyRead))
if(bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
while (keepSending && numBlocksToSend > 0) {
// Receive specific block request
val blockId = ois.readObject.asInstanceOf[Int]
// Send
bos.write(byteArray, 0, byteArray.length)
} else {
// Close the connection
// Ready to send
var requestPath = requestPathBase + "-" + blockId
// Open the file
var requestedFile: File = null
var requestedFileLen = -1
try {
requestedFile = new File(shuffleDir + "/" + requestPath)
requestedFileLen = requestedFile.length.toInt
} catch {
case e: Exception => { }
// Send the length of the requestPath to let the receiver know that
// transfer is about to start
// In the case of receiver timeout and connection close, this will
// throw a Broken pipe
logInfo("requestedFileLen = " + requestedFileLen)
// Read and send the requested file
if (requestedFileLen != -1) {
// Read
var byteArray = new Array[Byte](requestedFileLen)
val bis =
new BufferedInputStream(new FileInputStream(requestedFile))
var bytesRead =, 0, byteArray.length)
var alreadyRead = bytesRead
while (alreadyRead < requestedFileLen) {
bytesRead =, alreadyRead,
(byteArray.length - alreadyRead))
if(bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
// Send
bos.write(byteArray, 0, byteArray.length)
// Update loop variables
numBlocksToSend = numBlocksToSend - 1
curTime = System.currentTimeMillis
// Revoke sending only if there is anyone waiting in the queue
if (curTime - startTime >= Shuffle.MaxChatTime &&
threadPool.getQueue.size > 0) {
keepSending = false
} else {
// Close the connection
} catch {
// If something went wrong, e.g., the worker at the other end died etc
// then close everything up
// Exception can happen if the receiver stops receiving
// EOFException is expected to happen because receiver can break
// connection as soon as it has all the blocks
case eofe: => { }
case e: Exception => {
logInfo("ShuffleServerThread had a " + e)