From 7de49a8fc0c47fb4d2ce44e3ebe2978e002d9699 Mon Sep 17 00:00:00 2001 From: Yuchen Huo Date: Sat, 20 Feb 2021 17:57:43 -0800 Subject: [PATCH] [SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic ### What changes were proposed in this pull request? Extract optionsWithPath logic into their own function. ### Why are the changes needed? Reduce the code duplication and improve modularity. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Just some refactoring. Existing tests. Closes #31599 from yuchenhuo/SPARK-34481. Authored-by: Yuchen Huo Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/DataFrameReader.scala | 20 +++++++++++-------- .../apache/spark/sql/DataFrameWriter.scala | 20 +++++++++---------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1a1954281c..195f4f30e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -268,14 +268,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) - val optionsWithPath = if (paths.isEmpty) { - extraOptions - } else if (paths.length == 1) { - extraOptions + ("path" -> paths.head) - } else { - val objectMapper = new ObjectMapper() - extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray)) - } + val optionsWithPath = getOptionsWithPaths(paths: _*) val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++ optionsWithPath.originalMap @@ -308,6 +301,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { }.getOrElse(loadV1Source(paths: _*)) } + private def getOptionsWithPaths(paths: String*): CaseInsensitiveMap[String] = { + if (paths.isEmpty) { + extraOptions + } else if (paths.length == 1) { + extraOptions + ("path" -> paths.head) + } else { + val objectMapper = new ObjectMapper() + extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray)) + } + } + private def loadV1Source(paths: String*) = { val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && paths.length == 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1dba17b451..fe6572cff5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -314,11 +314,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, df.sparkSession.sessionState.conf) - val optionsWithPath = if (path.isEmpty) { - extraOptions - } else { - extraOptions + ("path" -> path.get) - } + val optionsWithPath = getOptionsWithPath(path) val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++ optionsWithPath.originalMap @@ -416,6 +412,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } + private def getOptionsWithPath(path: Option[String]): CaseInsensitiveMap[String] = { + if (path.isEmpty) { + extraOptions + } else { + extraOptions + ("path" -> path.get) + } + } + private def saveToV1Source(path: Option[String]): Unit = { partitioningColumns.foreach { columns => extraOptions = extraOptions + ( @@ -423,11 +427,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { DataSourceUtils.encodePartitioningColumns(columns)) } - val optionsWithPath = if (path.isEmpty) { - extraOptions - } else { - extraOptions + ("path" -> path.get) - } + val optionsWithPath = getOptionsWithPath(path) // Code path for data source v1. runCommand(df.sparkSession, "save") {