Renamed compressionOutputStream and compressionInputStream to compressedOutputStream and compressedInputStream.

This commit is contained in:
Reynold Xin 2013-07-30 18:28:46 -07:00
parent dae12fef9e
commit 98024eadc3
3 changed files with 10 additions and 10 deletions

View file

@ -30,9 +30,9 @@ import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
*/
trait CompressionCodec {
def compressionOutputStream(s: OutputStream): OutputStream
def compressedOutputStream(s: OutputStream): OutputStream
def compressionInputStream(s: InputStream): InputStream
def compressedInputStream(s: InputStream): InputStream
}
@ -59,11 +59,11 @@ private[spark] object CompressionCodec {
*/
class LZFCompressionCodec extends CompressionCodec {
override def compressionOutputStream(s: OutputStream): OutputStream = {
override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
}
override def compressionInputStream(s: InputStream): InputStream = new LZFInputStream(s)
override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
}
@ -73,10 +73,10 @@ class LZFCompressionCodec extends CompressionCodec {
*/
class SnappyCompressionCodec extends CompressionCodec {
override def compressionOutputStream(s: OutputStream): OutputStream = {
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
override def compressionInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

View file

@ -925,14 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
if (shouldCompress(blockId)) compressionCodec.compressionOutputStream(s) else s
if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}
/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
if (shouldCompress(blockId)) compressionCodec.compressionInputStream(s) else s
if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}
def dataSerialize(

View file

@ -27,7 +27,7 @@ class CompressionCodecSuite extends FunSuite {
def testCodec(codec: CompressionCodec) {
// Write 1000 integers to the output stream, compressed.
val outputStream = new ByteArrayOutputStream()
val out = codec.compressionOutputStream(outputStream)
val out = codec.compressedOutputStream(outputStream)
for (i <- 1 until 1000) {
out.write(i % 256)
}
@ -35,7 +35,7 @@ class CompressionCodecSuite extends FunSuite {
// Read the 1000 integers back.
val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
val in = codec.compressionInputStream(inputStream)
val in = codec.compressedInputStream(inputStream)
for (i <- 1 until 1000) {
assert(in.read() === i % 256)
}