[SPARK-34383][SS] Optimize WAL commit phase via reducing cost of filesystem operations

### What changes were proposed in this pull request?

This PR proposes to optimize WAL commit phase via following changes:

* cache offset log to avoid FS get operation per batch
* just directly delete instead of employing FS list operation on purge

### Why are the changes needed?

There're inefficiency on WAL commit phase which can be easily optimized via using a small driver memory.

1. To provide the offset metadata to source side (via `source.commit()`), we read offset metadata for previous batch from file system, which is probably written by this driver in previous batches. Caching it into driver memory would reduce the get operation.
2. Spark calls purge against offset log & commit log per batch, which calls list operation. If the previous batch succeeded to purge, the current batch just needs to check one batch which can be simply done via direct delete operation, instead of calling list operation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested with additional debug log. (Verified that cache is used, cache keeps the size as 2, only one delete call is used instead of list call)

Did some experiment with simple rate to console query. (NOTE: wasn't done with master branch - tested against Spark 2.4.x, but WAL commit phase hasn't been changed AFAIK during these versions)

AWS S3 + S3 guard:

> before the patch

<img width="1075" alt="aws-before" src="https://user-images.githubusercontent.com/1317309/107108721-6cc54380-687d-11eb-8f10-b906b9d58397.png">

> after the patch

<img width="1071" alt="aws-after" src="https://user-images.githubusercontent.com/1317309/107108724-7189f780-687d-11eb-88da-26912ac15c85.png">

Azure:

> before the patch

<img width="1074" alt="azure-before" src="https://user-images.githubusercontent.com/1317309/107108726-75b61500-687d-11eb-8c06-9048fa10ff9a.png">

> after the patch

<img width="1069" alt="azure-after" src="https://user-images.githubusercontent.com/1317309/107108729-79e23280-687d-11eb-8d97-e7f3aeec51be.png">

Closes #31495 from HeartSaVioR/SPARK-34383.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2021-03-22 08:47:07 +01:00 committed by Gabor Somogyi
parent f8838fe82b
commit 121883b1a5
3 changed files with 39 additions and 7 deletions

View file

@ -277,7 +277,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
fs.delete(path, true)
} catch {
case e: FileNotFoundException =>
logInfo(s"Failed to delete $path as it does not exist")
// ignore if file has already been deleted
}
}

View file

@ -239,18 +239,33 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
.reverse
}
private var lastPurgedBatchId: Long = -1L
/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
val possibleTargetBatchIds = (lastPurgedBatchId + 1 until thresholdBatchId)
if (possibleTargetBatchIds.length <= 3) {
// avoid using list if we only need to purge at most 3 elements
possibleTargetBatchIds.foreach { batchId =>
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
}
} else {
// using list to retrieve all elements
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
for (batchId <- batchIds if batchId < thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
for (batchId <- batchIds if batchId < thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
}
}
lastPurgedBatchId = thresholdBatchId - 1
}
/**

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.{util => ju}
import java.io.{InputStream, OutputStream}
import java.nio.charset.StandardCharsets._
@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
class OffsetSeqLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]()
override def add(batchId: Long, metadata: OffsetSeq): Boolean = {
val added = super.add(batchId, metadata)
if (added) {
// cache metadata as it will be read again
cachedMetadata.put(batchId, metadata)
// we don't access metadata for (batchId - 2) batches; evict them
cachedMetadata.headMap(batchId - 2, true).clear()
}
added
}
override def get(batchId: Long): Option[OffsetSeq] = {
Option(cachedMetadata.get(batchId)).orElse(super.get(batchId))
}
override protected def deserialize(in: InputStream): OffsetSeq = {
// called inside a try-finally where the underlying stream is closed in the caller
def parseOffset(value: String): OffsetV2 = value match {