[STREAMING] [MINOR] Close files correctly when iterator is finished in streaming WAL recovery
Currently there's no chance to close the file correctly after the iteration is finished, change to `CompletionIterator` to avoid resource leakage.
Author: jerryshao <saisai.shao@intel.com>
Closes #6050 from jerryshao/close-file-correctly and squashes the following commits:
52dfaf5 [jerryshao] Close files correctly when iterator is finished
(cherry picked from commit 25c01c5484
)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This commit is contained in:
parent
1538b10e8b
commit
9e226e1c8f
|
@ -26,7 +26,7 @@ import scala.language.postfixOps
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
import org.apache.spark.util.ThreadUtils
|
import org.apache.spark.util.{CompletionIterator, ThreadUtils}
|
||||||
import org.apache.spark.{Logging, SparkConf}
|
import org.apache.spark.{Logging, SparkConf}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog(
|
||||||
|
|
||||||
logFilesToRead.iterator.map { file =>
|
logFilesToRead.iterator.map { file =>
|
||||||
logDebug(s"Creating log reader with $file")
|
logDebug(s"Creating log reader with $file")
|
||||||
new FileBasedWriteAheadLogReader(file, hadoopConf)
|
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
|
||||||
|
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
|
||||||
} flatMap { x => x }
|
} flatMap { x => x }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue