diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c7db2127a6..9636fe88c7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2904,6 +2904,24 @@ private[spark] object Utils extends Logging { props.forEach((k, v) => resultProps.put(k, v)) resultProps } + + /** + * Convert a sequence of `Path`s to a metadata string. When the length of metadata string + * exceeds `stopAppendingThreshold`, stop appending paths for saving memory. + */ + def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { + val metadata = new StringBuilder("[") + var index: Int = 0 + while (index < paths.length && metadata.length < stopAppendingThreshold) { + if (index > 0) { + metadata.append(", ") + } + metadata.append(paths(index).toString) + index += 1 + } + metadata.append("]") + metadata.toString + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 931eb6b541..c9c8ae6023 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1301,6 +1301,14 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b") } } + + test("pathsToMetadata") { + val paths = (0 to 4).map(i => new Path(s"path$i")) + assert(Utils.buildLocationMetadata(paths, 5) == "[path0]") + assert(Utils.buildLocationMetadata(paths, 10) == "[path0, path1]") + assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]") + assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]") + } } private class SimpleExtension diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 66996498ff..0ae39cf856 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -55,10 +55,12 @@ trait DataSourceScanExec extends LeafExecNode { // Metadata that describes more details of this scan. protected def metadata: Map[String, String] + protected val maxMetadataValueLength = 100 + override def simpleString(maxFields: Int): String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => - key + ": " + StringUtils.abbreviate(redact(value), 100) + key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength) } val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) redact( @@ -335,7 +337,8 @@ case class FileSourceScanExec( def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location val locationDesc = - location.getClass.getSimpleName + seqToString(location.rootPaths) + location.getClass.getSimpleName + + Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength) val metadata = Map( "Format" -> relation.fileFormat.toString, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 6e05aa56f4..7e8e0ed2dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -94,8 +94,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def hashCode(): Int = getClass.hashCode() override def description(): String = { + val maxMetadataValueLength = 100 val locationDesc = - fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") + fileIndex.getClass.getSimpleName + + Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength) val metadata: Map[String, String] = Map( "ReadSchema" -> readDataSchema.catalogString, "PartitionFilters" -> seqToString(partitionFilters), @@ -105,7 +107,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin case (key, value) => val redactedValue = Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) - key + ": " + StringUtils.abbreviate(redactedValue, 100) + key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength) }.mkString(", ") s"${this.getClass.getSimpleName} $metadataStr" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index f1411b263c..c99be986dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import java.io.File + import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -116,6 +118,30 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { assert(isIncluded(df.queryExecution, "Location")) } } + + test("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") { + withTempPath { path => + val dir = path.getCanonicalPath + val partitionCol = "partitionCol" + spark.range(10) + .select("id", "id") + .toDF("value", partitionCol) + .write + .partitionBy(partitionCol) + .orc(dir) + val paths = (0 to 9).map(i => new File(dir, s"$partitionCol=$i").getCanonicalPath) + val plan = spark.read.orc(paths: _*).queryExecution.executedPlan + val location = plan collectFirst { + case f: FileSourceScanExec => f.metadata("Location") + } + assert(location.isDefined) + // The location metadata should at least contain one path + assert(location.get.contains(paths.head)) + // If the temp path length is larger than 100, the metadata length should not exceed + // twice of the length; otherwise, the metadata length should be controlled within 200. + assert(location.get.length < Math.max(paths.head.length, 100) * 2) + } + } } /**