[SPARK-17159][STREAM] Significant speed up for running spark streaming against Object store.

## What changes were proposed in this pull request?

Original work by Steve Loughran.
Based on #17745.

This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects.

This is a minor optimisation when working with filesystems, but significant when working with object stores.

## How was this patch tested?

Tests included. Existing tests pass.

Closes #22339 from ScrapCodes/PR_17745.

Lead-authored-by: Prashant Sharma <prashant@apache.org>
Co-authored-by: Steve Loughran <stevel@hortonworks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Prashant Sharma 2018-10-05 02:22:06 +01:00 committed by Sean Owen
parent 95ae209461
commit 3ae4f07de0
3 changed files with 118 additions and 51 deletions

View file

@ -17,19 +17,19 @@
package org.apache.spark.streaming.dstream
import java.io.{IOException, ObjectInputStream}
import java.io.{FileNotFoundException, IOException, ObjectInputStream}
import scala.collection.mutable
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)
// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L
@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. And new files may have the same modification time as the
* granularity of seconds in HDFS. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
@ -174,8 +171,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}
/**
@ -197,29 +192,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
.filter(_.isDirectory)
.map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
fs.listStatus(dir)
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
.map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
logDebug(s"Finding new files took $timeTaken ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
s"Time taken to find new files $timeTaken exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
"files in the monitored directories."
)
}
newFiles
} catch {
case e: FileNotFoundException =>
logWarning(s"No directory to scan: $directoryPath: $e")
Array.empty
case e: Exception =>
logWarning("Error finding new files", e)
logWarning(s"Error finding new files under $directoryPath", e)
reset()
Array.empty
}
@ -242,8 +237,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
* @param fileStatus file status
* @param currentTime time of the batch
* @param modTimeIgnoreThreshold the ignore threshold
* @return true if the file has been modified within the batch window
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
private def isNewFile(
fileStatus: FileStatus,
currentTime: Long,
modTimeIgnoreThreshold: Long): Boolean = {
val path = fileStatus.getPath
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
@ -251,7 +254,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
return false
}
// Reject file if it was created before the ignore time
val modTime = getFileModTime(path)
val modTime = fileStatus.getModificationTime()
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
@ -293,11 +296,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
new UnionRDD(context.sparkContext, fileRDDs)
}
/** Get file mod time from cache or fetch it from the file system */
private def getFileModTime(path: Path) = {
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}
private def directoryPath: Path = {
if (_path == null) _path = new Path(directory)
_path
@ -319,7 +317,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
/**

View file

@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.scalatest.BeforeAndAfter
@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("binary records stream") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
}
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("file input stream - wildcard") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)
def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
Files.write(data + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
@ -236,18 +231,85 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Over time, create files in the temp directory 1
val input1 = Seq(1, 2, 3, 4, 5)
input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))
// Over time, create files in the temp directory 1
val input2 = Seq(6, 7, 8, 9, 10)
input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))
// Verify that all the files have been read
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
test("Modified files are correctly detected.") {
withTempDir { testDir =>
val batchDuration = Seconds(2)
val durationMs = batchDuration.milliseconds
val testPath = new Path(testDir.toURI)
val streamDir = new Path(testPath, "streaming")
val streamGlobPath = new Path(streamDir, "sub*")
val generatedDir = new Path(testPath, "generated")
val generatedSubDir = new Path(generatedDir, "subdir")
val renamedSubDir = new Path(streamDir, "subdir")
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val sparkContext = ssc.sparkContext
val hc = sparkContext.hadoopConfiguration
val fs = FileSystem.get(testPath.toUri, hc)
fs.delete(testPath, true)
fs.mkdirs(testPath)
fs.mkdirs(streamDir)
fs.mkdirs(generatedSubDir)
def write(path: Path, text: String): Unit = {
val out = fs.create(path, true)
IOUtils.write(text, out)
out.close()
}
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val existingFile = new Path(generatedSubDir, "existing")
write(existingFile, "existing\n")
val status = fs.getFileStatus(existingFile)
clock.setTime(status.getModificationTime + durationMs)
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()
ssc.start()
clock.advance(durationMs)
eventually(eventuallyTimeout) {
assert(1 === batchCounter.getNumCompletedBatches)
}
// create and rename the file
// put a file into the generated directory
val textPath = new Path(generatedSubDir, "renamed.txt")
write(textPath, "renamed\n")
val now = clock.getTimeMillis()
val modTime = now + durationMs / 2
fs.setTimes(textPath, modTime, modTime)
val textFilestatus = fs.getFileStatus(existingFile)
assert(textFilestatus.getModificationTime < now + durationMs)
// rename the directory under the path being scanned
fs.rename(generatedSubDir, renamedSubDir)
// move forward one window
clock.advance(durationMs)
// await the next scan completing
eventually(eventuallyTimeout) {
assert(2 === batchCounter.getNumCompletedBatches)
}
// verify that the "renamed" file is found, but not the "existing" one which is out of
// the window
assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
}
}
}
@ -416,10 +478,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
def testFileStream(newFilesOnly: Boolean) {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@ -466,8 +526,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream}
import java.io.{File, IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
* (originally from `SqlTestUtils`.)
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}
}