diff --git a/conf/java-opts b/conf/java-opts index 168f0f0da1..63c44f7639 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500 +-Dspark.shuffle.class=spark.TrackedCustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500 diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 9a43e442b3..483d79a00a 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -569,7 +569,7 @@ object CustomBlockedInMemoryShuffle extends Logging { var requestedSplitBase = "%s/%d/%d/%d".format( shuffleDir, shuffleId, myIndex, outputId) - logInfo("requestedSplit: " + requestedSplitBase) + logInfo("requestedSplitBase: " + requestedSplitBase) // Read BLOCKNUM and send back the total number of blocks val blockNumName = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir, diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index f84d2e3ae3..36049509dd 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -470,68 +470,73 @@ extends Shuffle[K, V, C] with Logging { // Turn the timer OFF, if the sender responds before timeout timeOutTimer.cancel() - // Request specific block - oosSource.writeObject(hasBlocksInSplit(splitIndex)) - - // Good to go. First, receive the length of the requested file - var requestedFileLen = oisSource.readObject.asInstanceOf[Int] - logInfo("Received requestedFileLen = " + requestedFileLen) + while (hasBlocksInSplit(splitIndex) < totalBlocksInSplit(splitIndex)) { + // Set receptionSucceeded to false before trying for each block + receptionSucceeded = false - // Create a temp variable to be used in different places - val requestPath = "http://%s:%d/shuffle/%s-%d".format( - serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit, - hasBlocksInSplit(splitIndex)) - - // Receive the file - if (requestedFileLen != -1) { - val readStartTime = System.currentTimeMillis - logInfo("BEGIN READ: " + requestPath) - - // Receive data in an Array[Byte] - var recvByteArray = new Array[Byte](requestedFileLen) - var alreadyRead = 0 - var bytesRead = 0 + // Request specific block + oosSource.writeObject(hasBlocksInSplit(splitIndex)) - while (alreadyRead != requestedFileLen) { - bytesRead = isSource.read(recvByteArray, alreadyRead, - requestedFileLen - alreadyRead) - if (bytesRead > 0) { - alreadyRead = alreadyRead + bytesRead - } - } - - // Make it available to the consumer - try { - receivedData.put((splitIndex, recvByteArray)) - } catch { - case e: Exception => { - logInfo("Exception during putting data into receivedData") - } - } - - // TODO: Updating stats before consumption is completed - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - } + // Good to go. First, receive the length of the requested file + var requestedFileLen = oisSource.readObject.asInstanceOf[Int] + logInfo("Received requestedFileLen = " + requestedFileLen) - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) + // Create a temp variable to be used in different places + val requestPath = "http://%s:%d/shuffle/%s-%d".format( + serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit, + hasBlocksInSplit(splitIndex)) + + // Receive the file + if (requestedFileLen != -1) { + val readStartTime = System.currentTimeMillis + logInfo("BEGIN READ: " + requestPath) + + // Receive data in an Array[Byte] + var recvByteArray = new Array[Byte](requestedFileLen) + var alreadyRead = 0 + var bytesRead = 0 + + while (alreadyRead != requestedFileLen) { + bytesRead = isSource.read(recvByteArray, alreadyRead, + requestedFileLen - alreadyRead) + if (bytesRead > 0) { + alreadyRead = alreadyRead + bytesRead + } + } + + // Make it available to the consumer + try { + receivedData.put((splitIndex, recvByteArray)) + } catch { + case e: Exception => { + logInfo("Exception during putting data into receivedData") + } + } + + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + } + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } + + receptionSucceeded = true + + logInfo("END READ: " + requestPath) + val readTime = System.currentTimeMillis - readStartTime + logInfo("Reading " + requestPath + " took " + readTime + " millis.") + } else { + throw new SparkException("ShuffleServer " + serversplitInfo.hostAddress + " does not have " + requestSplit) } - - receptionSucceeded = true - - logInfo("END READ: " + requestPath) - val readTime = System.currentTimeMillis - readStartTime - logInfo("Reading " + requestPath + " took " + readTime + " millis.") - } else { - throw new SparkException("ShuffleServer " + serversplitInfo.hostAddress + " does not have " + requestSplit) } } catch { // EOFException is expected to happen because sender can break @@ -752,13 +757,13 @@ object TrackedCustomBlockedLocalFileShuffle extends Logging { override def run: Unit = { try { // Receive basic path information - var requestPath = ois.readObject.asInstanceOf[String] + var requestPathBase = ois.readObject.asInstanceOf[String] - logInfo("requestPath: " + requestPath) + logInfo("requestPathBase: " + requestPathBase) // Read BLOCKNUM file and send back the total number of blocks val blockNumFilePath = "%s/%s-BLOCKNUM".format(shuffleDir, - requestPath) + requestPathBase) val blockNumIn = new ObjectInputStream(new FileInputStream(blockNumFilePath)) val BLOCKNUM = blockNumIn.readObject.asInstanceOf[Int] @@ -766,60 +771,80 @@ object TrackedCustomBlockedLocalFileShuffle extends Logging { oos.writeObject(BLOCKNUM) - // Receive specific block request - val blockId = ois.readObject.asInstanceOf[Int] + val startTime = System.currentTimeMillis + var curTime = startTime + var keepSending = true + var numBlocksToSend = Shuffle.MaxChatBlocks - // Ready to send - requestPath = requestPath + "-" + blockId - - // Open the file - var requestedFile: File = null - var requestedFileLen = -1 - try { - requestedFile = new File(shuffleDir + "/" + requestPath) - requestedFileLen = requestedFile.length.toInt - } catch { - case e: Exception => { } - } - - // Send the length of the requestPath to let the receiver know that - // transfer is about to start - // In the case of receiver timeout and connection close, this will - // throw a java.net.SocketException: Broken pipe - oos.writeObject(requestedFileLen) - oos.flush() - - logInfo("requestedFileLen = " + requestedFileLen) - - // Read and send the requested file - if (requestedFileLen != -1) { - // Read - var byteArray = new Array[Byte](requestedFileLen) - val bis = - new BufferedInputStream(new FileInputStream(requestedFile)) - - var bytesRead = bis.read(byteArray, 0, byteArray.length) - var alreadyRead = bytesRead - - while (alreadyRead < requestedFileLen) { - bytesRead = bis.read(byteArray, alreadyRead, - (byteArray.length - alreadyRead)) - if(bytesRead > 0) { - alreadyRead = alreadyRead + bytesRead - } - } - bis.close() + while (keepSending && numBlocksToSend > 0) { + // Receive specific block request + val blockId = ois.readObject.asInstanceOf[Int] - // Send - bos.write(byteArray, 0, byteArray.length) - bos.flush() - } else { - // Close the connection + // Ready to send + var requestPath = requestPathBase + "-" + blockId + + // Open the file + var requestedFile: File = null + var requestedFileLen = -1 + try { + requestedFile = new File(shuffleDir + "/" + requestPath) + requestedFileLen = requestedFile.length.toInt + } catch { + case e: Exception => { } + } + + // Send the length of the requestPath to let the receiver know that + // transfer is about to start + // In the case of receiver timeout and connection close, this will + // throw a java.net.SocketException: Broken pipe + oos.writeObject(requestedFileLen) + oos.flush() + + logInfo("requestedFileLen = " + requestedFileLen) + + // Read and send the requested file + if (requestedFileLen != -1) { + // Read + var byteArray = new Array[Byte](requestedFileLen) + val bis = + new BufferedInputStream(new FileInputStream(requestedFile)) + + var bytesRead = bis.read(byteArray, 0, byteArray.length) + var alreadyRead = bytesRead + + while (alreadyRead < requestedFileLen) { + bytesRead = bis.read(byteArray, alreadyRead, + (byteArray.length - alreadyRead)) + if(bytesRead > 0) { + alreadyRead = alreadyRead + bytesRead + } + } + bis.close() + + // Send + bos.write(byteArray, 0, byteArray.length) + bos.flush() + + // Update loop variables + numBlocksToSend = numBlocksToSend - 1 + + curTime = System.currentTimeMillis + // Revoke sending only if there is anyone waiting in the queue + if (curTime - startTime >= Shuffle.MaxChatTime && + threadPool.getQueue.size > 0) { + keepSending = false + } + } else { + // Close the connection + } } } catch { // If something went wrong, e.g., the worker at the other end died etc // then close everything up // Exception can happen if the receiver stops receiving + // EOFException is expected to happen because receiver can break + // connection as soon as it has all the blocks + case eofe: java.io.EOFException => { } case e: Exception => { logInfo("ShuffleServerThread had a " + e) }