[SPARK-7389] [CORE] Tachyon integration improvement

Two main changes:

Add two functions in ExternalBlockManager, which are putValues and getValues
because the implementation may not rely on the putBytes and getBytes

improve Tachyon integration.
Currently, when putting data into Tachyon, Spark first serialize all data in one partition into a ByteBuffer, and then write into Tachyon, this will uses much memory and increase GC overhead

when get data from Tachyon, getValues depends on getBytes, which also read all data into On heap byte arry, and result in much memory usage.
This PR changes the approach of the two functions, make them read / write data by stream to reduce memory usage.

In our testing,  when data size is huge, this patch reduces about 30% GC time and 70% full GC time, and total execution time reduces about 10%

Author: Mingfei <mingfei.shi@intel.com>

Closes #5908 from shimingfei/Tachyon-integration-rebase and squashes the following commits:

033bc57 [Mingfei] modify accroding to comments
747c69a [Mingfei] modify according to comments - format changes
ce52c67 [Mingfei] put close() in a finally block
d2c60bb [Mingfei] modify according to comments, some code style change
4c11591 [Mingfei] modify according to comments split putIntoExternalBlockStore into two functions add default implementation for getValues and putValues
cc0a32e [Mingfei] Make getValues read data from Tachyon by stream Make putValues write data to Tachyon by stream
017593d [Mingfei] add getValues and putValues in ExternalBlockManager's Interface
This commit is contained in:
Mingfei 2015-05-20 22:33:03 -07:00 committed by Patrick Wendell
parent d0eb9ffe97
commit 04940c4975
4 changed files with 151 additions and 50 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark.storage
import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream, OutputStream}
import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
@ -489,16 +489,17 @@ private[spark] class BlockManager(
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
externalBlockStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
return Some(new BlockResult(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in externalBlockStore")
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
@ -1206,8 +1207,19 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
}
/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserializeStream(
blockId: BlockId,
inputStream: InputStream,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
val stream = new BufferedInputStream(inputStream)
serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
}
def stop(): Unit = {

View file

@ -32,6 +32,8 @@ import java.nio.ByteBuffer
*/
private[spark] abstract class ExternalBlockManager {
protected var blockManager: BlockManager = _
override def toString: String = {"External Block Store"}
/**
@ -41,7 +43,9 @@ private[spark] abstract class ExternalBlockManager {
*
* @throws java.io.IOException if there is any file system failure during the initialization.
*/
def init(blockManager: BlockManager, executorId: String): Unit
def init(blockManager: BlockManager, executorId: String): Unit = {
this.blockManager = blockManager
}
/**
* Drop the block from underlying external block store, if it exists..
@ -73,6 +77,11 @@ private[spark] abstract class ExternalBlockManager {
*/
def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
val bytes = blockManager.dataSerialize(blockId, values)
putBytes(blockId, bytes)
}
/**
* Retrieve the block bytes.
* @return Some(ByteBuffer) if the block bytes is successfully retrieved
@ -82,6 +91,17 @@ private[spark] abstract class ExternalBlockManager {
*/
def getBytes(blockId: BlockId): Option[ByteBuffer]
/**
* Retrieve the block data.
* @return Some(Iterator[Any]) if the block data is successfully retrieved
* None if the block does not exist in the external block store.
*
* @throws java.io.IOException if there is any file system failure in getting the block.
*/
def getValues(blockId: BlockId): Option[Iterator[_]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
/**
* Get the size of the block saved in the underlying external block store,
* which is saved before by putBytes.

View file

@ -18,9 +18,11 @@
package org.apache.spark.storage
import java.nio.ByteBuffer
import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import scala.util.control.NonFatal
/**
@ -40,7 +42,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
} catch {
case NonFatal(t) =>
logError(s"error in getSize from $blockId", t)
logError(s"Error in getSize($blockId)", t)
0L
}
}
@ -54,7 +56,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values.toIterator, level, returnValues)
putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
}
override def putIterator(
@ -62,42 +64,70 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
logDebug(s"Attempting to write values for block $blockId")
val bytes = blockManager.dataSerialize(blockId, values)
putIntoExternalBlockStore(blockId, bytes, returnValues)
putIntoExternalBlockStore(blockId, values, returnValues)
}
private def putIntoExternalBlockStore(
blockId: BlockId,
values: Iterator[_],
returnValues: Boolean): PutResult = {
logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
val startTime = System.currentTimeMillis
if (externalBlockManager.isDefined) {
externalBlockManager.get.putValues(blockId, values)
val size = getSize(blockId)
val data = if (returnValues) {
Left(getValues(blockId).get)
} else {
null
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
blockId, Utils.bytesToString(size), finishTime - startTime))
PutResult(size, data)
} else {
logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) =>
logError(s"Error in putValues($blockId)", t)
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
}
private def putIntoExternalBlockStore(
blockId: BlockId,
bytes: ByteBuffer,
returnValues: Boolean): PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
logDebug(s"Attempting to put block $blockId into ExtBlk store")
logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
val startTime = System.currentTimeMillis
if (externalBlockManager.isDefined) {
externalBlockManager.get.putBytes(blockId, bytes)
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
externalBlockManager.get.putBytes(blockId, byteBuffer)
val size = bytes.limit()
val data = if (returnValues) {
Right(bytes)
} else {
null
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
if (returnValues) {
PutResult(bytes.limit(), Right(bytes.duplicate()))
blockId, Utils.bytesToString(size), finishTime - startTime))
PutResult(size, data)
} else {
PutResult(bytes.limit(), null)
}
} else {
logError(s"error in putBytes $blockId")
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) =>
logError(s"error in putBytes $blockId", t)
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
logError(s"Error in putBytes($blockId)", t)
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
}
@ -107,13 +137,19 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
} catch {
case NonFatal(t) =>
logError(s"error in removing $blockId", t)
logError(s"Error in removeBlock($blockId)", t)
true
}
}
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
try {
externalBlockManager.flatMap(_.getValues(blockId))
} catch {
case NonFatal(t) =>
logError(s"Error in getValues($blockId)", t)
None
}
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@ -121,7 +157,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.flatMap(_.getBytes(blockId))
} catch {
case NonFatal(t) =>
logError(s"error in getBytes from $blockId", t)
logError(s"Error in getBytes($blockId)", t)
None
}
}
@ -130,13 +166,13 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
try {
val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
if (!ret) {
logInfo(s"remove block $blockId")
logInfo(s"Remove block $blockId")
blockManager.removeBlock(blockId, true)
}
ret
} catch {
case NonFatal(t) =>
logError(s"error in getBytes from $blockId", t)
logError(s"Error in getBytes($blockId)", t)
false
}
}

View file

@ -22,7 +22,10 @@ import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.{Date, Random}
import scala.util.control.NonFatal
import com.google.common.io.ByteStreams
import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.TachyonURI
@ -38,7 +41,6 @@ import org.apache.spark.util.Utils
*/
private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
var blockManager: BlockManager =_
var rootDirs: String = _
var master: String = _
var client: tachyon.client.TachyonFS = _
@ -52,7 +54,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def init(blockManager: BlockManager, executorId: String): Unit = {
this.blockManager = blockManager
super.init(blockManager, executorId)
val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
@ -95,9 +97,30 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
try {
os.write(bytes.array())
} catch {
case NonFatal(e) =>
logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
os.cancel()
} finally {
os.close()
}
}
override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
try {
blockManager.dataSerializeStream(blockId, os, values)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to put values of block $blockId into Tachyon", e)
os.cancel()
} finally {
os.close()
}
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = getFile(blockId)
@ -105,21 +128,31 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
return None
}
val is = file.getInStream(ReadType.CACHE)
assert (is != null)
try {
val size = file.length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
} catch {
case ioe: IOException =>
logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
case NonFatal(e) =>
logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
None
} finally {
is.close()
}
}
override def getValues(blockId: BlockId): Option[Iterator[_]] = {
val file = getFile(blockId)
if (file == null || file.getLocationHosts().size() == 0) {
return None
}
val is = file.getInStream(ReadType.CACHE)
Option(is).map { is =>
blockManager.dataDeserializeStream(blockId, is)
}
}
override def getSize(blockId: BlockId): Long = {
getFile(blockId.name).length
}
@ -184,7 +217,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
tachyonDir = client.getFile(path)
}
} catch {
case e: Exception =>
case NonFatal(e) =>
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
}
}
@ -206,7 +239,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
case NonFatal(e) =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}