[SPARK-31793][SQL] Reduce the memory usage in file scan location metadata

### What changes were proposed in this pull request?

Currently, the data source scan node stores all the paths in its metadata. The metadata is kept when a SparkPlan is converted into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan graph in UI.

However, the paths can be very large (e.g. it can be many partitions after partition pruning), while UI pages only require up to 100 bytes for the location metadata. We can reduce the paths stored in metadata to reduce memory usage.

### Why are the changes needed?

Reduce unnecessary memory cost.
In the heap dump of a driver, the SparkPlanInfo instances are quite large and it should be avoided:
![image](https://user-images.githubusercontent.com/1097932/82642318-8f65de00-9bc2-11ea-9c9c-f05c2b0e1c49.png)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #28610 from gengliangwang/improveLocationMetadata.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
This commit is contained in:
Gengliang Wang 2020-05-23 15:00:28 -07:00
parent 64ffc66496
commit 9fdc2a0801
5 changed files with 61 additions and 4 deletions

View file

@ -2904,6 +2904,24 @@ private[spark] object Utils extends Logging {
props.forEach((k, v) => resultProps.put(k, v))
resultProps
}
/**
* Convert a sequence of `Path`s to a metadata string. When the length of metadata string
* exceeds `stopAppendingThreshold`, stop appending paths for saving memory.
*/
def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = {
val metadata = new StringBuilder("[")
var index: Int = 0
while (index < paths.length && metadata.length < stopAppendingThreshold) {
if (index > 0) {
metadata.append(", ")
}
metadata.append(paths(index).toString)
index += 1
}
metadata.append("]")
metadata.toString
}
}
private[util] object CallerContext extends Logging {

View file

@ -1301,6 +1301,14 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b")
}
}
test("pathsToMetadata") {
val paths = (0 to 4).map(i => new Path(s"path$i"))
assert(Utils.buildLocationMetadata(paths, 5) == "[path0]")
assert(Utils.buildLocationMetadata(paths, 10) == "[path0, path1]")
assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]")
assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]")
}
}
private class SimpleExtension

View file

@ -55,10 +55,12 @@ trait DataSourceScanExec extends LeafExecNode {
// Metadata that describes more details of this scan.
protected def metadata: Map[String, String]
protected val maxMetadataValueLength = 100
override def simpleString(maxFields: Int): String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength)
}
val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields)
redact(
@ -335,7 +337,8 @@ case class FileSourceScanExec(
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val location = relation.location
val locationDesc =
location.getClass.getSimpleName + seqToString(location.rootPaths)
location.getClass.getSimpleName +
Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
val metadata =
Map(
"Format" -> relation.fileFormat.toString,

View file

@ -94,8 +94,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin
override def hashCode(): Int = getClass.hashCode()
override def description(): String = {
val maxMetadataValueLength = 100
val locationDesc =
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
fileIndex.getClass.getSimpleName +
Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength)
val metadata: Map[String, String] = Map(
"ReadSchema" -> readDataSchema.catalogString,
"PartitionFilters" -> seqToString(partitionFilters),
@ -105,7 +107,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin
case (key, value) =>
val redactedValue =
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value)
key + ": " + StringUtils.abbreviate(redactedValue, 100)
key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength)
}.mkString(", ")
s"${this.getClass.getSimpleName} $metadataStr"
}

View file

@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution
import java.io.File
import scala.collection.mutable
import org.apache.hadoop.fs.Path
@ -116,6 +118,30 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
assert(isIncluded(df.queryExecution, "Location"))
}
}
test("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") {
withTempPath { path =>
val dir = path.getCanonicalPath
val partitionCol = "partitionCol"
spark.range(10)
.select("id", "id")
.toDF("value", partitionCol)
.write
.partitionBy(partitionCol)
.orc(dir)
val paths = (0 to 9).map(i => new File(dir, s"$partitionCol=$i").getCanonicalPath)
val plan = spark.read.orc(paths: _*).queryExecution.executedPlan
val location = plan collectFirst {
case f: FileSourceScanExec => f.metadata("Location")
}
assert(location.isDefined)
// The location metadata should at least contain one path
assert(location.get.contains(paths.head))
// If the temp path length is larger than 100, the metadata length should not exceed
// twice of the length; otherwise, the metadata length should be controlled within 200.
assert(location.get.length < Math.max(paths.head.length, 100) * 2)
}
}
}
/**