[SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic

### What changes were proposed in this pull request?

Extract optionsWithPath logic into their own function.

### Why are the changes needed?

Reduce the code duplication and improve modularity.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Just some refactoring. Existing tests.

Closes #31599 from yuchenhuo/SPARK-34481.

Authored-by: Yuchen Huo <yuchen.huo@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Yuchen Huo 2021-02-20 17:57:43 -08:00 committed by Dongjoon Hyun
parent 82b33a3041
commit 7de49a8fc0
2 changed files with 22 additions and 18 deletions

View file

@ -268,14 +268,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
val optionsWithPath = if (paths.isEmpty) {
extraOptions
} else if (paths.length == 1) {
extraOptions + ("path" -> paths.head)
} else {
val objectMapper = new ObjectMapper()
extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
}
val optionsWithPath = getOptionsWithPaths(paths: _*)
val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
@ -308,6 +301,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}.getOrElse(loadV1Source(paths: _*))
}
private def getOptionsWithPaths(paths: String*): CaseInsensitiveMap[String] = {
if (paths.isEmpty) {
extraOptions
} else if (paths.length == 1) {
extraOptions + ("path" -> paths.head)
} else {
val objectMapper = new ObjectMapper()
extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
}
}
private def loadV1Source(paths: String*) = {
val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && paths.length == 1) {

View file

@ -314,11 +314,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, df.sparkSession.sessionState.conf)
val optionsWithPath = if (path.isEmpty) {
extraOptions
} else {
extraOptions + ("path" -> path.get)
}
val optionsWithPath = getOptionsWithPath(path)
val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
@ -416,6 +412,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
}
private def getOptionsWithPath(path: Option[String]): CaseInsensitiveMap[String] = {
if (path.isEmpty) {
extraOptions
} else {
extraOptions + ("path" -> path.get)
}
}
private def saveToV1Source(path: Option[String]): Unit = {
partitioningColumns.foreach { columns =>
extraOptions = extraOptions + (
@ -423,11 +427,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
DataSourceUtils.encodePartitioningColumns(columns))
}
val optionsWithPath = if (path.isEmpty) {
extraOptions
} else {
extraOptions + ("path" -> path.get)
}
val optionsWithPath = getOptionsWithPath(path)
// Code path for data source v1.
runCommand(df.sparkSession, "save") {